[jboss-cvs] JBoss Messaging SVN: r5185 - in branches/Branch_Chunk_CRS2: src/main/org/jboss/messaging/core/client/impl and 4 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Oct 27 17:12:24 EDT 2008
Author: clebert.suconic at jboss.com
Date: 2008-10-27 17:12:24 -0400 (Mon, 27 Oct 2008)
New Revision: 5185
Added:
branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/soak/chunk/
branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/soak/chunk/MessageChunkSoakTest.java
Modified:
branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/client/ClientMessage.java
branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java
branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/client/impl/ClientMessageImpl.java
branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java
branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java
Log:
Fixing some issues with flow control and huge files
Modified: branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/client/ClientMessage.java
===================================================================
--- branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/client/ClientMessage.java 2008-10-27 16:57:12 UTC (rev 5184)
+++ branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/client/ClientMessage.java 2008-10-27 21:12:24 UTC (rev 5185)
@@ -44,4 +44,8 @@
void onReceipt(ClientSessionInternal session, long consumerID);
void acknowledge() throws MessagingException;
+
+ void setLargeMessage(boolean largeMessage);
+
+ boolean isLargeMessage();
}
Modified: branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2008-10-27 16:57:12 UTC (rev 5184)
+++ branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2008-10-27 21:12:24 UTC (rev 5185)
@@ -13,17 +13,22 @@
package org.jboss.messaging.core.client.impl;
import java.io.File;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.Executor;
import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.FileClientMessage;
import org.jboss.messaging.core.client.MessageHandler;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.Channel;
+import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerCloseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendChunkMessage;
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
import org.jboss.messaging.util.Future;
@@ -67,6 +72,8 @@
private File largeMessagesDir;
+ private ClientMessage currentChunkMessage;
+
private volatile Thread receiverThread;
@@ -157,7 +164,11 @@
boolean expired = m.isExpired();
- flowControl(m.getEncodeSize());
+ // Chunk messages will execute the flow control while receiving the chunks
+ if (!m.isLargeMessage())
+ {
+ flowControl(m.getEncodeSize());
+ }
if (expired)
{
@@ -200,12 +211,14 @@
FileClientMessageImpl message = new FileClientMessageImpl();
message.decodeProperties(propertiesBuffer);
message.setFile(new File(this.largeMessagesDir, message.getMessageID() + "-" + this.session.getName() + "-" + this.getID() + ".jbm"));
+ message.setLargeMessage(true);
return message;
}
else
{
ClientMessageImpl message = new ClientMessageImpl();
message.decodeProperties(propertiesBuffer);
+ message.setLargeMessage(true);
return message;
}
}
@@ -312,7 +325,12 @@
boolean expired = message.isExpired();
- flowControl(message.getEncodeSize());
+ // message chunk will call flowControl in a different method
+ // Message chunks will have already informed the flowControl
+ if (!message.isLargeMessage())
+ {
+ flowControl(message.getEncodeSize());
+ }
if (!expired)
{
@@ -340,7 +358,76 @@
notify();
}
}
+
+
+ public void handleChunk(SessionSendChunkMessage chunk) throws Exception
+ {
+ if (closed)
+ {
+ return;
+ }
+
+ flowControl(chunk.getPacketSize());
+
+ if (chunk.getHeader() != null)
+ {
+
+ // The Header only comes on the first message, so a buffer has to be created on the client
+ // to hold either a file or a big message
+ MessagingBuffer header = new ByteBufferWrapper(ByteBuffer.wrap(chunk.getHeader()));
+
+ currentChunkMessage = createFileMessage(header);
+
+ if (currentChunkMessage instanceof FileClientMessage)
+ {
+ FileClientMessage fileMessage = (FileClientMessage)currentChunkMessage;
+ addBytesBody(fileMessage, chunk.getBody());
+ }
+ else
+ {
+ MessagingBuffer initialBody = new ByteBufferWrapper(ByteBuffer.wrap(chunk.getBody()));
+ currentChunkMessage.setBody(initialBody);
+ }
+ }
+ else
+ {
+ // No header.. this is then a continuation of a previous message
+ ByteBuffer body = ByteBuffer.wrap(chunk.getBody());
+
+ if (currentChunkMessage instanceof FileClientMessage)
+ {
+ FileClientMessage fileMessage = (FileClientMessage)currentChunkMessage;
+ addBytesBody(fileMessage, chunk.getBody());
+ }
+ else
+ {
+ MessagingBuffer currentBody = currentChunkMessage.getBody();
+
+ MessagingBuffer newBody = new ByteBufferWrapper(ByteBuffer.allocate(currentBody.limit() + body.limit()));
+
+ newBody.putBytes(currentBody.array());
+ newBody.putBytes(body.array());
+
+ currentChunkMessage.setBody(newBody);
+ }
+ }
+
+ if (!chunk.isContinues())
+ {
+ // Close the file that was being generated
+ if (currentChunkMessage instanceof FileClientMessage)
+ {
+ ((FileClientMessage)currentChunkMessage).closeChannel();
+ }
+ ClientMessage msgToSend = currentChunkMessage;
+ currentChunkMessage = null;
+ handleMessage(msgToSend);
+ }
+
+ }
+
+
public void clear()
{
synchronized (this)
@@ -517,6 +604,13 @@
}
}
+
+ private void addBytesBody(FileClientMessage fileMessage, byte[] body) throws Exception
+ {
+ FileChannel channel = fileMessage.getChannel();
+ channel.write(ByteBuffer.wrap(body));
+ }
+
// Inner classes
// --------------------------------------------------------------------------------
Modified: branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java
===================================================================
--- branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java 2008-10-27 16:57:12 UTC (rev 5184)
+++ branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java 2008-10-27 21:12:24 UTC (rev 5185)
@@ -24,6 +24,7 @@
import org.jboss.messaging.core.client.ClientConsumer;
import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendChunkMessage;
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
/**
@@ -38,6 +39,8 @@
long getID();
void handleMessage(ClientMessage message) throws Exception;
+
+ void handleChunk(SessionSendChunkMessage chunk) throws Exception;
void clear();
Modified: branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/client/impl/ClientMessageImpl.java
===================================================================
--- branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/client/impl/ClientMessageImpl.java 2008-10-27 16:57:12 UTC (rev 5184)
+++ branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/client/impl/ClientMessageImpl.java 2008-10-27 21:12:24 UTC (rev 5185)
@@ -42,6 +42,8 @@
private long consumerID;
private ClientSessionInternal session;
+
+ private boolean largeMessage;
/*
* Constructor for when reading from network
@@ -100,4 +102,22 @@
session.acknowledge(consumerID, messageID);
}
}
+
+ /**
+ * @return the largeMessage
+ */
+ public boolean isLargeMessage()
+ {
+ return largeMessage;
+ }
+
+ /**
+ * @param largeMessage the largeMessage to set
+ */
+ public void setLargeMessage(boolean largeMessage)
+ {
+ this.largeMessage = largeMessage;
+ }
+
+
}
Modified: branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
===================================================================
--- branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java 2008-10-27 16:57:12 UTC (rev 5184)
+++ branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java 2008-10-27 21:12:24 UTC (rev 5185)
@@ -332,7 +332,12 @@
if (msg.getEncodeSize() > bigMessageSize)
{
- sendMessageInChunks(sendBlocking, msg, scheduledDeliveryTime);
+ // TODO: We need to send message-chunks blocked until producer flow control is done.
+ // When sending really big messages, you could overuse the communication channel up to the point
+ // you get out of memory, or you don't let the pings to arrive properly on the server
+ // We will need to live with blocked sends until we enable producer flow control
+ sendMessageInChunks(true, msg, scheduledDeliveryTime);
+ //sendMessageInChunks(sendBlocking, msg, scheduledDeliveryTime);
}
else
{
Modified: branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2008-10-27 16:57:12 UTC (rev 5184)
+++ branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2008-10-27 21:12:24 UTC (rev 5185)
@@ -678,71 +678,15 @@
public void handleReceiveChunk(final long consumerID, final SessionSendChunkMessage chunk) throws Exception
{
- ClientMessage currentChunkMessage;
+ ClientConsumerInternal consumer = consumers.get(consumerID);
- if (chunk.getHeader() != null)
+ if (consumer != null)
{
-
- // The Header only comes on the first message, so a buffer has to be created on the client
- // to hold either a file or a big message
- MessagingBuffer header = new ByteBufferWrapper(ByteBuffer.wrap(chunk.getHeader()));
-
- currentChunkMessage = createLargeMessage(consumerID, header);
-
- if (currentChunkMessage instanceof FileClientMessage)
- {
- FileClientMessage fileMessage = (FileClientMessage)currentChunkMessage;
- addBytesBody(fileMessage, chunk.getBody());
- }
- else
- {
- MessagingBuffer initialBody = new ByteBufferWrapper(ByteBuffer.wrap(chunk.getBody()));
- currentChunkMessage.setBody(initialBody);
- }
-
- currentChunk.put(consumerID, currentChunkMessage);
+ consumer.handleChunk(chunk);
}
- else
- {
- // No header.. this is then a continuation of a previous message
- ByteBuffer body = ByteBuffer.wrap(chunk.getBody());
-
- currentChunkMessage = currentChunk.get(consumerID);
-
- if (currentChunkMessage instanceof FileClientMessage)
- {
- FileClientMessage fileMessage = (FileClientMessage)currentChunkMessage;
- addBytesBody(fileMessage, chunk.getBody());
- }
- else
- {
- MessagingBuffer currentBody = currentChunkMessage.getBody();
-
- MessagingBuffer newBody = new ByteBufferWrapper(ByteBuffer.allocate(currentBody.limit() + body.limit()));
-
- newBody.putBytes(currentBody.array());
- newBody.putBytes(body.array());
-
- currentChunkMessage.setBody(newBody);
- }
- }
-
- if (!chunk.isContinues())
- {
- if (currentChunkMessage instanceof FileClientMessage)
- {
- ((FileClientMessage)currentChunkMessage).closeChannel();
- }
- handleReceiveMessage(consumerID, currentChunkMessage);
- }
+
}
- private void addBytesBody(FileClientMessage fileMessage, byte[] body) throws Exception
- {
- FileChannel channel = fileMessage.getChannel();
- channel.write(ByteBuffer.wrap(body));
- }
-
public void receiveProducerCredits(final long producerID, final int credits) throws Exception
{
ClientProducerInternal producer = producers.get(producerID);
@@ -786,18 +730,6 @@
doCleanup();
}
- public ClientMessage createLargeMessage(final long consumerID, final MessagingBuffer header) throws Exception
- {
- ClientConsumerInternal consumer = consumers.get(consumerID);
-
- if (consumer == null)
- {
- throw new IllegalStateException("No Consumer with ID = " + consumerID);
- }
-
- return consumer.createFileMessage(header);
- }
-
// Needs to be synchronized to prevent issues with occurring concurrently with close()
public synchronized boolean handleFailover(final RemotingConnection backupConnection)
{
Modified: branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java
===================================================================
--- branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java 2008-10-27 16:57:12 UTC (rev 5184)
+++ branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java 2008-10-27 21:12:24 UTC (rev 5185)
@@ -56,6 +56,4 @@
boolean handleFailover(final RemotingConnection backupConnection);
- ClientMessage createLargeMessage(long consumerID, MessagingBuffer header) throws Exception;
-
}
Modified: branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2008-10-27 16:57:12 UTC (rev 5184)
+++ branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2008-10-27 21:12:24 UTC (rev 5185)
@@ -26,7 +26,7 @@
import java.util.Iterator;
import java.util.LinkedList;
import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -36,15 +36,15 @@
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.PostOffice;
import org.jboss.messaging.core.remoting.Channel;
-import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
import org.jboss.messaging.core.remoting.DelayedResult;
import org.jboss.messaging.core.remoting.Packet;
+import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.NullResponseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionReplicateDeliveryMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendChunkMessage;
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionReplicateDeliveryMessage;
import org.jboss.messaging.core.server.HandleStatus;
import org.jboss.messaging.core.server.MessageReference;
import org.jboss.messaging.core.server.Queue;
@@ -91,7 +91,7 @@
private final Lock lock = new ReentrantLock();
- private final AtomicInteger availableCredits;
+ private final Semaphore availableCredits;
private boolean started;
@@ -140,7 +140,7 @@
if (enableFlowControl)
{
- availableCredits = new AtomicInteger(0);
+ availableCredits = new Semaphore(0);
}
else
{
@@ -170,7 +170,7 @@
public HandleStatus handle(final MessageReference ref) throws Exception
{
- if (availableCredits != null && availableCredits.get() <= 0)
+ if (availableCredits != null && availableCredits.availablePermits() <= 0)
{
return HandleStatus.BUSY;
}
@@ -203,11 +203,6 @@
return HandleStatus.NO_MATCH;
}
- if (availableCredits != null)
- {
- availableCredits.addAndGet(-message.getEncodeSize());
- }
-
final SessionReceiveMessage packet = new SessionReceiveMessage(id, message, ref.getDeliveryCount() + 1);
if (!browseOnly)
@@ -232,7 +227,14 @@
{
public void run()
{
- sendChunks((ServerLargeMessage)message);
+ try
+ {
+ sendChunks((ServerLargeMessage)message);
+ }
+ catch (Exception e)
+ {
+ log.error(e);
+ }
}
});
@@ -241,6 +243,11 @@
else
{
+ if (availableCredits != null)
+ {
+ availableCredits.release(message.getEncodeSize());
+ }
+
if (result == null)
{
// Not replicated - just send now
@@ -362,7 +369,11 @@
{
if (availableCredits != null)
{
- int previous = availableCredits.getAndAdd(credits);
+ int previous;
+
+ // previous will be negative only at the first call, hence it is not necessary to make the next call atomic
+ previous = availableCredits.availablePermits();
+ availableCredits.release(credits);
if (previous <= 0 && previous + credits > 0)
{
@@ -454,7 +465,7 @@
* @param message
* @throws MessagingException
*/
- private void sendChunks(ServerLargeMessage message)
+ private void sendChunks(ServerLargeMessage message) throws Exception
{
int headerSize = message.getPropertiesEncodeSize();
@@ -473,6 +484,12 @@
bodyBuffer.array(),
bodyLength < bodySize,
false);
+
+ if (availableCredits != null)
+ {
+ availableCredits.acquire(chunk.getPacketSize());
+ }
+
channel.send(chunk);
for (int pos = bodyLength; pos < bodySize;)
@@ -484,6 +501,11 @@
chunk = new SessionSendChunkMessage(id, null, bodyBuffer.array(), pos + bodyLength < bodySize, false);
+ if (availableCredits != null)
+ {
+ availableCredits.acquire(chunk.getPacketSize());
+ }
+
channel.send(chunk);
pos += bodyLength;
Modified: branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java
===================================================================
--- branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java 2008-10-27 16:57:12 UTC (rev 5184)
+++ branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java 2008-10-27 21:12:24 UTC (rev 5185)
@@ -49,8 +49,6 @@
import org.jboss.messaging.core.remoting.impl.RemotingConnectionImpl;
import org.jboss.messaging.core.remoting.impl.RemotingServiceImpl;
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
-import org.jboss.messaging.core.server.ServerMessage;
-import org.jboss.messaging.core.server.impl.ServerMessageImpl;
import org.jboss.messaging.core.settings.impl.QueueSettings;
import org.jboss.messaging.tests.integration.chunkmessage.mock.MockConnector;
import org.jboss.messaging.tests.integration.chunkmessage.mock.MockConnectorFactory;
@@ -83,6 +81,16 @@
// Public --------------------------------------------------------
+ // Validate the functions to create and verify files
+ public void testFiles() throws Exception
+ {
+ clearData();
+
+ File file = createLargeFile("test.tst", 13333);
+
+ checkFileRead(file, 13333);
+ }
+
public void testFailureOnSendingFile() throws Exception
{
clearData();
@@ -192,7 +200,7 @@
public void testMessageChunkNullPersistence() throws Exception
{
- testInternal(false, false, 100, 5000, false, 0);
+ testInternal(false, false, 1, 5000, false, 0);
}
public void testMessageChunkNullPersistenceDelayed() throws Exception
@@ -207,13 +215,12 @@
public void testMessageChunkFilePersistence100M() throws Exception
{
- testInternal(true, true, 1, 268435456, true, 0);
- //testInternal(true, true, 1, 26214400, false, 0);
+ testInternal(true, true, 1, 26214400, false, 0);
}
public void testMessageChunkFilePersistenceDelayed() throws Exception
{
- testInternal(true, false, 1000, 50000, false, 1000);
+ testInternal(true, false, 1, 50000, false, 1000);
}
public void testSendfileMessage() throws Exception
@@ -336,7 +343,12 @@
}
- private void testPageOnLargeMessage(final boolean realFiles, final boolean sendBlocking) throws Exception
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ protected void testPageOnLargeMessage(final boolean realFiles, final boolean sendBlocking) throws Exception
{
clearData();
@@ -482,7 +494,7 @@
ByteBuffer ioBuffer = ByteBuffer.allocate(DataConstants.SIZE_INT * numberOfIntegers);
MessagingBuffer body = new ByteBufferWrapper(ioBuffer);
- for (int i = 1; i <= numberOfIntegers; i++)
+ for (int i = 0; i < numberOfIntegers; i++)
{
body.putInt(i);
}
@@ -492,7 +504,7 @@
}
- public void testInternal(final boolean realFiles,
+ protected void testInternal(final boolean realFiles,
final boolean useFile,
final int numberOfMessages,
final int numberOfIntegers,
@@ -591,7 +603,7 @@
for (int i = 0; i < numberOfMessages; i++)
{
- ClientMessage message = consumer.receive(1000 + delayDelivery);
+ ClientMessage message = consumer.receive(60000 + delayDelivery);
assertNotNull(message);
@@ -644,10 +656,6 @@
}
}
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
@Override
protected void setUp() throws Exception
{
@@ -658,9 +666,7 @@
{
}
- // Private -------------------------------------------------------
-
- private FileClientMessage createLargeClientMessage(final ClientSession session, final int numberOfIntegers) throws Exception
+ protected FileClientMessage createLargeClientMessage(final ClientSession session, final int numberOfIntegers) throws Exception
{
FileClientMessage clientMessage = session.createFileMessage(true);
@@ -679,7 +685,7 @@
* @throws FileNotFoundException
* @throws IOException
*/
- private File createLargeFile(String name, final int numberOfIntegers) throws FileNotFoundException, IOException
+ protected File createLargeFile(String name, final int numberOfIntegers) throws FileNotFoundException, IOException
{
File tmpFile = new File(temporaryDir + "/" + name);
@@ -694,16 +700,16 @@
{
if (buffer.position() > 0 && i % 1000 == 0)
{
- buffer.rewind();
+ buffer.flip();
channel.write(buffer);
- buffer.rewind();
+ buffer.clear();
}
buffer.putInt(i);
}
if (buffer.position() > 0)
{
- buffer.rewind();
+ buffer.flip();
channel.write(buffer);
}
@@ -723,7 +729,7 @@
* @throws FileNotFoundException
* @throws IOException
*/
- private void readMessage(final ClientSession session, final SimpleString queueToRead, final int numberOfIntegers) throws MessagingException,
+ protected void readMessage(final ClientSession session, final SimpleString queueToRead, final int numberOfIntegers) throws MessagingException,
FileNotFoundException,
IOException
{
@@ -757,28 +763,34 @@
* @throws FileNotFoundException
* @throws IOException
*/
- private void checkFileRead(final File receivedFile, final int numberOfIntegers) throws FileNotFoundException,
+ protected void checkFileRead(final File receivedFile, final int numberOfIntegers) throws FileNotFoundException,
IOException
{
RandomAccessFile random2 = new RandomAccessFile(receivedFile, "r");
FileChannel channel2 = random2.getChannel();
- ByteBuffer buffer2 = ByteBuffer.allocate(4);
+ ByteBuffer buffer2 = ByteBuffer.allocate(1000 * 4);
channel2.position(0l);
-
- for (int i = 0; i < numberOfIntegers; i++)
+
+ for (int i = 0; i < numberOfIntegers;)
{
- buffer2.rewind();
channel2.read(buffer2);
- buffer2.rewind();
+
+ buffer2.flip();
+ for (int j = 0; j < buffer2.limit() / 4; j++, i++)
+ {
+ assertEquals(i, buffer2.getInt());
+ }
- assertEquals(i, buffer2.getInt());
+ buffer2.clear();
}
channel2.close();
}
+ // Private -------------------------------------------------------
+
// Inner classes -------------------------------------------------
}
Added: branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/soak/chunk/MessageChunkSoakTest.java
===================================================================
--- branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/soak/chunk/MessageChunkSoakTest.java (rev 0)
+++ branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/soak/chunk/MessageChunkSoakTest.java 2008-10-27 21:12:24 UTC (rev 5185)
@@ -0,0 +1,68 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.tests.soak.chunk;
+
+import org.jboss.messaging.tests.integration.chunkmessage.MessageChunkTest;
+
+/**
+ * A MessageChunkSoakTest
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ * Created Oct 27, 2008 3:44:37 PM
+ *
+ *
+ */
+public class MessageChunkSoakTest extends MessageChunkTest
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+ public void testMessageChunkFilePersistence1G() throws Exception
+ {
+ testInternal(true, true, 2, 268435456, false, 0);
+ }
+
+ public void testMessageChunkFilePersistence100M() throws Exception
+ {
+ testInternal(true, true, 10, 26214400, false, 0);
+ }
+
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
More information about the jboss-cvs-commits
mailing list