[jboss-cvs] JBoss Messaging SVN: r5115 - in branches/Branch_Chunk_Clebert: src/main/org/jboss/messaging/core/client/impl and 6 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Oct 14 21:10:44 EDT 2008
Author: clebert.suconic at jboss.com
Date: 2008-10-14 21:10:44 -0400 (Tue, 14 Oct 2008)
New Revision: 5115
Modified:
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/ClientSession.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/FileClientMessage.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/FileClientMessageImpl.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/exception/MessagingException.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/message/Message.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java
branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/integration/remoting/DestroyConsumerTest.java
branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java
Log:
Treating largeMessages as files on client side also + client File Message
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/ClientSession.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/ClientSession.java 2008-10-14 22:36:10 UTC (rev 5114)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/ClientSession.java 2008-10-15 01:10:44 UTC (rev 5115)
@@ -111,6 +111,8 @@
ClientMessage createClientMessage(final byte type, final boolean durable);
ClientMessage createClientMessage(final boolean durable);
+
+ FileClientMessage createFileMessage(final boolean durable);
void start() throws MessagingException;
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/FileClientMessage.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/FileClientMessage.java 2008-10-14 22:36:10 UTC (rev 5114)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/FileClientMessage.java 2008-10-15 01:10:44 UTC (rev 5115)
@@ -25,6 +25,8 @@
import java.io.File;
import java.nio.channels.FileChannel;
+import org.jboss.messaging.core.exception.MessagingException;
+
/**
* A FileClientMessage
*
@@ -34,15 +36,15 @@
*
*
*/
-public interface FileClientMessage
+public interface FileClientMessage extends ClientMessage
{
File getFile();
void setFile(File file);
- FileChannel getChannel() throws Exception;
+ FileChannel getChannel() throws MessagingException;
- void closeChannel() throws Exception;
+ void closeChannel() throws MessagingException;
}
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java 2008-10-14 22:36:10 UTC (rev 5114)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java 2008-10-15 01:10:44 UTC (rev 5115)
@@ -27,6 +27,7 @@
import org.jboss.messaging.core.client.AcknowledgementHandler;
import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.FileClientMessage;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.message.impl.MessageImpl;
@@ -333,7 +334,6 @@
if (msg.getEncodeSize() > BIG_PACKAGE_SIZE)
{
sendMessageInChunks(sendBlocking, msg);
-
}
else
{
@@ -425,6 +425,17 @@
pos += bodyLength;
}
+
+ if (msg instanceof FileClientMessage)
+ {
+ try
+ {
+ ((FileClientMessage)msg).closeChannel();
+ }
+ catch (Exception e)
+ {
+ }
+ }
}
private void checkClosed() throws MessagingException
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2008-10-14 22:36:10 UTC (rev 5114)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2008-10-15 01:10:44 UTC (rev 5115)
@@ -39,6 +39,7 @@
import org.jboss.messaging.core.client.ClientMessage;
import org.jboss.messaging.core.client.ClientProducer;
import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.FileClientMessage;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.Channel;
@@ -539,6 +540,11 @@
return new ClientMessageImpl(durable, body);
}
+ public FileClientMessage createFileMessage(final boolean durable)
+ {
+ return new FileClientMessageImpl(durable);
+ }
+
public boolean isClosed()
{
return closed;
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/FileClientMessageImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/FileClientMessageImpl.java 2008-10-14 22:36:10 UTC (rev 5114)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/FileClientMessageImpl.java 2008-10-15 01:10:44 UTC (rev 5115)
@@ -30,6 +30,7 @@
import java.nio.channels.FileChannel;
import org.jboss.messaging.core.client.FileClientMessage;
+import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
@@ -56,8 +57,50 @@
{
super();
}
+
+ public FileClientMessageImpl(boolean durable)
+ {
+ super(durable, null);
+ }
+
+
+
/**
+ * @param type
+ * @param durable
+ * @param expiration
+ * @param timestamp
+ * @param priority
+ * @param body
+ */
+ public FileClientMessageImpl(byte type,
+ boolean durable,
+ long expiration,
+ long timestamp,
+ byte priority,
+ MessagingBuffer body)
+ {
+ super(type, durable, expiration, timestamp, priority, body);
+ // TODO Auto-generated constructor stub
+ }
+
+
+
+ /**
+ * @param type
+ * @param durable
+ * @param body
+ */
+ public FileClientMessageImpl(byte type, boolean durable, MessagingBuffer body)
+ {
+ super(type, durable, body);
+ // TODO Auto-generated constructor stub
+ }
+
+
+
+ /**
* @param deliveryCount
*/
public FileClientMessageImpl(final int deliveryCount)
@@ -98,7 +141,7 @@
try
{
// We open a new channel on getBody.
- // for a better performance, users should be using the channel, instead of reading the file
+ // for a better performance, users should be using the channels when using file
channel = newChannel();
ByteBuffer buffer = ByteBuffer.allocate((int)channel.size());
@@ -124,20 +167,27 @@
}
}
}
-
- /**
- * @return
- * @throws FileNotFoundException
- * @throws IOException
- */
- private FileChannel newChannel() throws FileNotFoundException, IOException
+
+ public synchronized void encodeBody(MessagingBuffer buffer, int start, int size) throws MessagingException
{
- RandomAccessFile randomFile = new RandomAccessFile(getFile(), "rw");
- randomFile.seek(0);
-
- FileChannel channel = randomFile.getChannel();
- return channel;
+ try
+ {
+ FileChannel channel = getChannel();
+
+ ByteBuffer bufferRead = ByteBuffer.allocate(size);
+
+ channel.position(start);
+ channel.read(bufferRead);
+
+ buffer.putBytes(bufferRead.array());
+ }
+ catch (IOException e)
+ {
+ throw new MessagingException(MessagingException.INTERNAL_ERROR, e.getMessage(), e);
+ }
+
}
+
@Override
public void setBody(final MessagingBuffer body)
@@ -146,7 +196,7 @@
throw new RuntimeException("Not supported");
}
- public synchronized FileChannel getChannel() throws Exception
+ public synchronized FileChannel getChannel() throws MessagingException
{
if (currentChannel == null)
{
@@ -156,18 +206,25 @@
return currentChannel;
}
- public void closeChannel() throws Exception
+ public synchronized void closeChannel() throws MessagingException
{
if (currentChannel != null)
{
- currentChannel.close();
+ try
+ {
+ currentChannel.close();
+ }
+ catch (IOException e)
+ {
+ throw new MessagingException(MessagingException.INTERNAL_ERROR, e.getMessage(), e);
+ }
currentChannel = null;
}
}
@Override
- public int getBodySize()
+ public synchronized int getBodySize()
{
return (int)file.length();
}
@@ -178,6 +235,27 @@
// Private -------------------------------------------------------
+ /**
+ * @return
+ * @throws FileNotFoundException
+ * @throws IOException
+ */
+ private FileChannel newChannel() throws MessagingException
+ {
+ try
+ {
+ RandomAccessFile randomFile = new RandomAccessFile(getFile(), "rw");
+ randomFile.seek(0);
+
+ FileChannel channel = randomFile.getChannel();
+ return channel;
+ }
+ catch (IOException e)
+ {
+ throw new MessagingException(MessagingException.INTERNAL_ERROR, e.getMessage(), e);
+ }
+ }
+
// Inner classes -------------------------------------------------
}
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/exception/MessagingException.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/exception/MessagingException.java 2008-10-14 22:36:10 UTC (rev 5114)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/exception/MessagingException.java 2008-10-15 01:10:44 UTC (rev 5115)
@@ -82,6 +82,13 @@
this.code = code;
}
+ public MessagingException(int code, String msg, Throwable cause)
+ {
+ super(msg, cause);
+
+ this.code = code;
+ }
+
public int getCode()
{
return code;
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/message/Message.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/message/Message.java 2008-10-14 22:36:10 UTC (rev 5114)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/message/Message.java 2008-10-15 01:10:44 UTC (rev 5115)
@@ -24,6 +24,7 @@
import java.util.Set;
+import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.journal.EncodingSupport;
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
import org.jboss.messaging.util.SimpleString;
@@ -86,7 +87,7 @@
// Used on Message chunk
- void encodeBody(MessagingBuffer buffer, int start, int size);
+ void encodeBody(MessagingBuffer buffer, int start, int size) throws MessagingException;
void encodeBody(MessagingBuffer buffer);
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java 2008-10-14 22:36:10 UTC (rev 5114)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java 2008-10-15 01:10:44 UTC (rev 5115)
@@ -30,6 +30,7 @@
import java.nio.ByteBuffer;
import java.util.Set;
+import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.message.Message;
import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
@@ -186,12 +187,13 @@
public void encodeBody(MessagingBuffer buffer)
{
- buffer.putInt(body.limit());
- buffer.putBytes(body.array(), 0, body.limit());
+ MessagingBuffer localBody = getBody();
+ buffer.putInt(localBody.limit());
+ buffer.putBytes(localBody.array(), 0, localBody.limit());
}
// Used on Message chunk
- public void encodeBody(MessagingBuffer buffer, int start, int size)
+ public void encodeBody(MessagingBuffer buffer, int start, int size) throws MessagingException
{
buffer.putBytes(body.array(), start, size);
}
Modified: branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java
===================================================================
--- branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java 2008-10-14 22:36:10 UTC (rev 5114)
+++ branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java 2008-10-15 01:10:44 UTC (rev 5115)
@@ -23,13 +23,18 @@
package org.jboss.messaging.tests.integration.chunkmessage;
import java.io.File;
+import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import junit.framework.AssertionFailedError;
+
import org.jboss.messaging.core.client.ClientConsumer;
import org.jboss.messaging.core.client.ClientMessage;
import org.jboss.messaging.core.client.ClientProducer;
import org.jboss.messaging.core.client.ClientSession;
import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.FileClientMessage;
import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
import org.jboss.messaging.tests.util.ServiceTestBase;
@@ -62,17 +67,50 @@
public void testMessageChunkNullPersistence() throws Exception
{
- testInternal(false);
+ testInternal(false, false, 5000);
}
public void testMessageChunkFilePersistence() throws Exception
{
- testInternal(true);
+ testInternal(true, false, 5000);
}
- public void testInternal(final boolean realFiles) throws Exception
+ public void testSendfileMessage() throws Exception
{
+ testInternal(true, true, 5000);
+ }
+
+ public void testSendfileMessageOnNullPersistence() throws Exception
+ {
+ testInternal(false, true, 5000);
+ }
+
+ public void testSendfileMessageSmallMessage() throws Exception
+ {
+ testInternal(true, true, 4);
+
+ }
+
+ public void testSendfileMessageOnNullPersistenceSmallMessage() throws Exception
+ {
+ testInternal(false, true, 4);
+ }
+
+ public void testSendRegularMessageNullPersistence() throws Exception
+ {
+ testInternal(false, false, 4);
+
+ }
+
+ public void testSendRegularMessagePersistence() throws Exception
+ {
+ testInternal(true, false, 4);
+ }
+
+ public void testInternal(final boolean realFiles, final boolean useFile, final int numberOfIntegers) throws Exception
+ {
+
clearData();
messagingService = createService(realFiles);
@@ -88,9 +126,10 @@
ClientProducer producer = session.createProducer(ADDRESS);
- MessagingBuffer body = new ByteBufferWrapper(ByteBuffer.allocate(DataConstants.SIZE_INT * 150000));
+ ByteBuffer ioBuffer = ByteBuffer.allocate(DataConstants.SIZE_INT * numberOfIntegers);
+ MessagingBuffer body = new ByteBufferWrapper(ioBuffer);
- for (int i = 0; i < 15000; i++)
+ for (int i = 0; i < numberOfIntegers; i++)
{
body.putInt(i);
}
@@ -98,9 +137,33 @@
// printBuffer("body to be sent : " , body);
- ClientMessage message = session.createClientMessage(true);
+ ClientMessage message = null;
- message.setBody(body);
+ if (useFile)
+ {
+ message = session.createFileMessage(true);
+
+ File tmpData = new File(new File(temporaryDir), "someFile.dat");
+
+ RandomAccessFile randomTmp = new RandomAccessFile(tmpData, "rw");
+
+ FileChannel channel = randomTmp.getChannel();
+
+ channel.write(ioBuffer);
+
+ channel.close();
+
+ ((FileClientMessage)message).setFile(tmpData);
+ }
+ else
+ {
+ message = session.createClientMessage(true);
+ message.setBody(body);
+ assertEquals(message.getEncodeSize(), message.getPropertiesEncodeSize() + DataConstants.SIZE_INT + message.getBodySize());
+ System.out.println("encodeSize = " + message.getEncodeSize());
+
+ }
+
producer.send(message);
session.close();
@@ -134,14 +197,32 @@
System.out.println("msg on client = " + message2.getMessageID());
// printBuffer("message received : ", message2.getBody());
+
+
- assertEqualsByteArrays(body.limit(), body.array(), message2.getBody().array());
+ try
+ {
+ assertEqualsByteArrays(body.limit(), body.array(), message2.getBody().array());
+ }
+ catch (AssertionFailedError e)
+ {
+ printBuffer("Expected buffer:", body.array());
+ printBuffer("Arriving buffer:", message2.getBody().array());
+ throw e;
+ }
+
session.close();
}
finally
{
- messagingService.stop();
+ try
+ {
+ messagingService.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
}
}
@@ -161,19 +242,17 @@
// Private -------------------------------------------------------
- public static void printBuffer(final String msg, final MessagingBuffer buffer)
+ public static void printBuffer(final String msg, final byte[] buffer)
{
- buffer.rewind();
+ int size = buffer.length;
- int size = buffer.limit();
-
System.out.print(msg);
for (int i = 0; i < size; i++)
{
- System.out.print(String.format("%1$X", buffer.getByte()));
- if (i % 40 != 0 || i == 0)
+ System.out.print(String.format("%1$X", buffer[i]));
+ if (i + 1 < size && (i % 40 != 0 || i == 0))
{
System.out.print(", ");
}
@@ -183,8 +262,7 @@
System.out.print(msg);
}
}
- buffer.rewind();
-
+ System.out.println();
}
// Inner classes -------------------------------------------------
Modified: branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/integration/remoting/DestroyConsumerTest.java
===================================================================
--- branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/integration/remoting/DestroyConsumerTest.java 2008-10-14 22:36:10 UTC (rev 5114)
+++ branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/integration/remoting/DestroyConsumerTest.java 2008-10-15 01:10:44 UTC (rev 5115)
@@ -53,32 +53,46 @@
MessagingService service = createService(false, false, createDefaultConfig(), new HashMap<String, QueueSettings>());
service.start();
- SimpleString queue = new SimpleString("add1");
+ try
+ {
+
+ SimpleString queue = new SimpleString("add1");
+
+ ClientSessionFactory factory = createInVMFactory();
+
+ ClientSession session = factory.createSession(false, false, false, false);
+
+ session.createQueue(queue, queue, null, false, false);
+
+ ClientConsumer consumer = session.createConsumer(queue);
+
+ session.start();
+
+ Binding binding = service.getServer().getPostOffice().getBindingsForAddress(queue).get(0);
+
+ assertEquals(1, binding.getQueue().getConsumerCount());
+
+ ClientSessionImpl impl = (ClientSessionImpl) session;
+
+ // Simulating a CTRL-C what would close the Socket but not the ClientSession
+ impl.cleanUp();
+
+
+ assertEquals(0, binding.getQueue().getConsumerCount());
- ClientSessionFactory factory = createInVMFactory();
+ }
+ finally
+ {
+ try
+ {
+ service.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
- ClientSession session = factory.createSession(false, false, false, false);
- session.createQueue(queue, queue, null, false, false);
-
- ClientConsumer consumer = session.createConsumer(queue);
-
- session.start();
-
- Binding binding = service.getServer().getPostOffice().getBindingsForAddress(queue).get(0);
-
- assertEquals(1, binding.getQueue().getConsumerCount());
-
- ClientSessionImpl impl = (ClientSessionImpl) session;
-
- // Simulating a CTRL-C what would close the Socket but not the ClientSession
- impl.cleanUp();
-
-
- assertEquals(0, binding.getQueue().getConsumerCount());
-
-
-
}
// Package protected ---------------------------------------------
Modified: branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java
===================================================================
--- branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java 2008-10-14 22:36:10 UTC (rev 5114)
+++ branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java 2008-10-15 01:10:44 UTC (rev 5115)
@@ -74,6 +74,8 @@
protected String largeMessagesDir = System.getProperty("java.io.tmpdir", "/tmp") + "/unit-test/large-msg";
protected String clientLargeMessagesDir = System.getProperty("java.io.tmpdir", "/tmp") + "/unit-test/client-large-msg";
+
+ protected String temporaryDir = System.getProperty("java.io.tmpdir", "/tmp") + "/unit-test/temporary";
protected MessagingService messagingService;
@@ -90,22 +92,21 @@
protected void clearData()
{
- File file = new File(journalDir);
- File file2 = new File(bindingsDir);
- File file3 = new File(pageDir);
- File file4 = new File(largeMessagesDir);
- File file5 = new File(clientLargeMessagesDir);
+ deleteAndCreateDir(journalDir);
+ deleteAndCreateDir(bindingsDir);
+ deleteAndCreateDir(pageDir);
+ deleteAndCreateDir(largeMessagesDir);
+ deleteAndCreateDir(clientLargeMessagesDir);
+ deleteAndCreateDir(temporaryDir);
+ }
+
+ protected void deleteAndCreateDir(String directory)
+ {
+ File file = new File(directory);
deleteDirectory(file);
file.mkdirs();
- deleteDirectory(file2);
- file2.mkdirs();
- deleteDirectory(file3);
- file3.mkdirs();
- deleteDirectory(file4);
- file4.mkdirs();
- deleteDirectory(file5);
- file5.mkdirs();
}
+
protected MessagingService createService(final boolean realFiles,
final boolean netty,
More information about the jboss-cvs-commits
mailing list