[jboss-cvs] JBoss Messaging SVN: r5114 - in branches/Branch_Chunk_Clebert: src/main/org/jboss/messaging/core/client/impl and 2 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Oct 14 18:36:10 EDT 2008
Author: clebert.suconic at jboss.com
Date: 2008-10-14 18:36:10 -0400 (Tue, 14 Oct 2008)
New Revision: 5114
Added:
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/FileClientMessage.java
Modified:
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/ClientConsumer.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientMessageImpl.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/FileClientMessageImpl.java
branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java
branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java
Log:
Treating largeMessages as files on client side also
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/ClientConsumer.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/ClientConsumer.java 2008-10-14 15:45:22 UTC (rev 5113)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/ClientConsumer.java 2008-10-14 22:36:10 UTC (rev 5114)
@@ -18,10 +18,12 @@
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
+ */
package org.jboss.messaging.core.client;
+import java.io.File;
+
import org.jboss.messaging.core.exception.MessagingException;
/**
@@ -30,20 +32,29 @@
* @author <a href="mailto:ataylor at redhat.com">Andy Taylor</a>
*/
public interface ClientConsumer
-{
- ClientMessage receive() throws MessagingException;
-
+{
+ ClientMessage receive() throws MessagingException;
+
ClientMessage receive(long timeout) throws MessagingException;
-
+
ClientMessage receiveImmediate() throws MessagingException;
-
+
MessageHandler getMessageHandler() throws MessagingException;
void setMessageHandler(MessageHandler handler) throws MessagingException;
-
+
void close() throws MessagingException;
-
- boolean isClosed();
-
- boolean isDirect();
+
+ boolean isClosed();
+
+ boolean isDirect();
+
+ boolean isLargeMessagesAsFiles();
+
+ void setLargeMessagesAsFiles(boolean bvalue);
+
+ File getLargeMessagesDir();
+
+ void setLargeMessagesDir(File directory);
+
}
Added: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/FileClientMessage.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/FileClientMessage.java (rev 0)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/FileClientMessage.java 2008-10-14 22:36:10 UTC (rev 5114)
@@ -0,0 +1,48 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.client;
+
+import java.io.File;
+import java.nio.channels.FileChannel;
+
+/**
+ * A FileClientMessage
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ * Created Oct 14, 2008 3:21:15 PM
+ *
+ *
+ */
+public interface FileClientMessage
+{
+ File getFile();
+
+ void setFile(File file);
+
+ FileChannel getChannel() throws Exception;
+
+ void closeChannel() throws Exception;
+
+
+}
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2008-10-14 15:45:22 UTC (rev 5113)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2008-10-14 22:36:10 UTC (rev 5114)
@@ -12,6 +12,8 @@
package org.jboss.messaging.core.client.impl;
+import java.io.File;
+import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.Executor;
@@ -21,8 +23,10 @@
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.Channel;
+import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerCloseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
import org.jboss.messaging.util.Future;
/**
@@ -63,6 +67,11 @@
private final Runner runner = new Runner();
+ private boolean largeMessagesAsFiles = false;
+
+ private File largeMessagesDir;
+
+
private volatile Thread receiverThread;
private volatile Thread onMessageThread;
@@ -100,6 +109,8 @@
this.clientWindowSize = clientWindowSize;
this.direct = direct;
+
+ this.largeMessagesDir = new File(System.getProperty("user.dir") + File.separator + "jbm-large-messages");
}
// ClientConsumer implementation
@@ -187,6 +198,29 @@
}
}
+
+ public ClientMessage createLargeMessage(MessagingBuffer propertiesBuffer) throws Exception
+ {
+ if (isLargeMessagesAsFiles())
+ {
+ if (!this.largeMessagesDir.exists())
+ {
+ largeMessagesDir.mkdirs();
+ }
+
+ FileClientMessageImpl message = new FileClientMessageImpl();
+ message.decodeProperties(propertiesBuffer);
+ message.setFile(new File(this.largeMessagesDir, message.getMessageID() + "-" + this.getID() + ".jbm"));
+ return message;
+ }
+ else
+ {
+ ClientMessageImpl message = new ClientMessageImpl();
+ message.decodeProperties(propertiesBuffer);
+ return message;
+ }
+ }
+
public ClientMessage receive() throws MessagingException
{
return receive(0);
@@ -255,6 +289,27 @@
return direct;
}
+
+ public boolean isLargeMessagesAsFiles()
+ {
+ return largeMessagesAsFiles;
+ }
+
+ public void setLargeMessagesAsFiles(boolean bvalue)
+ {
+ this.largeMessagesAsFiles = bvalue;
+ }
+
+ public File getLargeMessagesDir()
+ {
+ return this.largeMessagesDir;
+ }
+
+ public void setLargeMessagesDir(File directory)
+ {
+ this.largeMessagesDir = directory;
+ }
+
// ClientConsumerInternal implementation
// --------------------------------------------------------------
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java 2008-10-14 15:45:22 UTC (rev 5113)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java 2008-10-14 22:36:10 UTC (rev 5114)
@@ -24,6 +24,7 @@
import org.jboss.messaging.core.client.ClientConsumer;
import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
/**
*
@@ -51,4 +52,6 @@
void cleanUp() throws Exception;
void failover();
+
+ ClientMessage createLargeMessage(MessagingBuffer propertiesBuffer) throws Exception;
}
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientMessageImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientMessageImpl.java 2008-10-14 15:45:22 UTC (rev 5113)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientMessageImpl.java 2008-10-14 22:36:10 UTC (rev 5114)
@@ -72,7 +72,6 @@
super((byte) 0, durable, 0, System.currentTimeMillis(), (byte)4, body);
}
- /* Only used in testing */
public ClientMessageImpl()
{
}
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2008-10-14 15:45:22 UTC (rev 5113)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2008-10-14 22:36:10 UTC (rev 5114)
@@ -705,7 +705,14 @@
consumer.handleMessage(message);
}
}
+
+ public ClientMessage createLargeMessage(final long consumerID, final MessagingBuffer header) throws Exception
+ {
+ ClientConsumerInternal consumer = consumers.get(consumerID);
+ return consumer.createLargeMessage(header);
+ }
+
public void receiveProducerCredits(final long producerID, final int credits) throws Exception
{
ClientProducerInternal producer = producers.get(producerID);
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java 2008-10-14 15:45:22 UTC (rev 5113)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java 2008-10-14 22:36:10 UTC (rev 5114)
@@ -20,6 +20,7 @@
import org.jboss.messaging.core.client.ClientSession;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
import org.jboss.messaging.util.SimpleString;
/**
@@ -59,5 +60,7 @@
void handleReceiveMessage(long consumerID, ClientMessage message) throws Exception;
- void handleFailover(final RemotingConnection backupConnection);
+ void handleFailover(RemotingConnection backupConnection);
+
+ ClientMessage createLargeMessage(long consumerID, MessagingBuffer header) throws Exception;
}
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java 2008-10-14 15:45:22 UTC (rev 5113)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java 2008-10-14 22:36:10 UTC (rev 5114)
@@ -23,15 +23,21 @@
package org.jboss.messaging.core.client.impl;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.EXCEPTION;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CHUNK_SEND;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_RECEIVETOKENS;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_RECEIVE_MSG;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CHUNK_SEND;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.FileClientMessage;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.ChannelHandler;
import org.jboss.messaging.core.remoting.Packet;
@@ -54,50 +60,55 @@
private static final Logger log = Logger.getLogger(ClientSessionPacketHandler.class);
private final ClientSessionInternal clientSession;
-
+
private Map<Long, ClientMessage> currentChunk = new ConcurrentHashMap<Long, ClientMessage>();
public ClientSessionPacketHandler(final ClientSessionInternal clientSesssion)
- {
+ {
this.clientSession = clientSesssion;
}
-
+
public void handlePacket(final Packet packet)
{
byte type = packet.getType();
-
+
try
{
switch (type)
{
case SESS_RECEIVETOKENS:
{
- SessionProducerFlowCreditMessage message = (SessionProducerFlowCreditMessage) packet;
-
+ SessionProducerFlowCreditMessage message = (SessionProducerFlowCreditMessage)packet;
+
clientSession.receiveProducerCredits(message.getProducerID(), message.getTokens());
-
+
break;
}
case SESS_CHUNK_SEND:
{
System.out.println("received a chunk");
- SessionSendChunkMessage chunk = (SessionSendChunkMessage) packet;
-
+ SessionSendChunkMessage chunk = (SessionSendChunkMessage)packet;
+
ClientMessage currentChunkMessage = null;
-
+
if (chunk.getHeader() != null)
{
MessagingBuffer header = new ByteBufferWrapper(ByteBuffer.wrap(chunk.getHeader()));
- currentChunkMessage = new ClientMessageImpl();
-
- currentChunkMessage.decodeProperties(header);
-
- MessagingBuffer initialBody = new ByteBufferWrapper(ByteBuffer.wrap(chunk.getBody()));
-
- currentChunkMessage.setBody(initialBody);
-
+ currentChunkMessage = clientSession.createLargeMessage(chunk.getTargetID(), header);
+
+ if (currentChunkMessage instanceof FileClientMessage)
+ {
+ FileClientMessage fileMessage = (FileClientMessage)currentChunkMessage;
+ addBytesBody(fileMessage, chunk.getBody());
+ }
+ else
+ {
+ MessagingBuffer initialBody = new ByteBufferWrapper(ByteBuffer.wrap(chunk.getBody()));
+ currentChunkMessage.setBody(initialBody);
+ }
+
currentChunk.put(chunk.getTargetID(), currentChunkMessage);
}
else
@@ -105,41 +116,52 @@
ByteBuffer body = ByteBuffer.wrap(chunk.getBody());
currentChunkMessage = currentChunk.get(chunk.getMessageID());
-
- MessagingBuffer currentBody = currentChunkMessage.getBody();
-
- MessagingBuffer newBody = new ByteBufferWrapper(ByteBuffer.allocate(currentBody.limit() + body.limit()));
-
- newBody.putBytes(currentBody.array());
- newBody.putBytes(body.array());
-
- currentChunkMessage.setBody(newBody);
+
+ if (currentChunkMessage instanceof FileClientMessage)
+ {
+ FileClientMessage fileMessage = (FileClientMessage)currentChunkMessage;
+ addBytesBody(fileMessage, chunk.getBody());
+ }
+ else
+ {
+ MessagingBuffer currentBody = currentChunkMessage.getBody();
+
+ MessagingBuffer newBody = new ByteBufferWrapper(ByteBuffer.allocate(currentBody.limit() + body.limit()));
+
+ newBody.putBytes(currentBody.array());
+ newBody.putBytes(body.array());
+
+ currentChunkMessage.setBody(newBody);
+ }
}
-
+
if (!chunk.isContinues())
{
+ if (currentChunkMessage instanceof FileClientMessage)
+ {
+ ((FileClientMessage)currentChunkMessage).closeChannel();
+ }
clientSession.handleReceiveMessage(chunk.getTargetID(), currentChunkMessage);
}
-
-
+
break;
}
case SESS_RECEIVE_MSG:
{
- SessionReceiveMessage message = (SessionReceiveMessage) packet;
-
+ SessionReceiveMessage message = (SessionReceiveMessage)packet;
+
clientSession.handleReceiveMessage(message.getConsumerID(), message.getClientMessage());
-
+
break;
}
case EXCEPTION:
{
- //TODO - we can provide a means for async exceptions to get back to to client
- //For now we just log it
+ // TODO - we can provide a means for async exceptions to get back to to client
+ // For now we just log it
MessagingExceptionMessage mem = (MessagingExceptionMessage)packet;
-
+
log.error("Received exception asynchronously from server", mem.getException());
-
+
break;
}
default:
@@ -153,4 +175,16 @@
log.error("Failed to handle packet", e);
}
}
+
+ /**
+ * @param fileMessage
+ * @param body
+ * @throws FileNotFoundException
+ * @throws IOException
+ */
+ private void addBytesBody(FileClientMessage fileMessage, byte[] body) throws Exception
+ {
+ FileChannel channel = fileMessage.getChannel();
+ channel.write(ByteBuffer.wrap(body));
+ }
}
\ No newline at end of file
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/FileClientMessageImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/FileClientMessageImpl.java 2008-10-14 15:45:22 UTC (rev 5113)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/client/impl/FileClientMessageImpl.java 2008-10-14 22:36:10 UTC (rev 5114)
@@ -20,9 +20,19 @@
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
-
package org.jboss.messaging.core.client.impl;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+import org.jboss.messaging.core.client.FileClientMessage;
+import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+
/**
* A FileClientMessageImpl
*
@@ -32,9 +42,29 @@
*
*
*/
-public class FileClientMessageImpl extends ClientMessageImpl
+public class FileClientMessageImpl extends ClientMessageImpl implements FileClientMessage
{
+ File file;
+
+ FileChannel currentChannel;
+
+ /**
+ *
+ */
+ public FileClientMessageImpl()
+ {
+ super();
+ }
+
+ /**
+ * @param deliveryCount
+ */
+ public FileClientMessageImpl(final int deliveryCount)
+ {
+ super(deliveryCount);
+ }
+
// Constants -----------------------------------------------------
// Attributes ----------------------------------------------------
@@ -45,6 +75,103 @@
// Public --------------------------------------------------------
+ /**
+ * @return the file
+ */
+ public File getFile()
+ {
+ return file;
+ }
+
+ /**
+ * @param file the file to set
+ */
+ public void setFile(final File file)
+ {
+ this.file = file;
+ }
+
+ @Override
+ public MessagingBuffer getBody()
+ {
+ FileChannel channel = null;
+ try
+ {
+ // We open a new channel on getBody.
+ // for a better performance, users should be using the channel, instead of reading the file
+ channel = newChannel();
+
+ ByteBuffer buffer = ByteBuffer.allocate((int)channel.size());
+
+ channel.position(0);
+ channel.read(buffer);
+
+ return new ByteBufferWrapper(buffer);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ finally
+ {
+ try
+ {
+ channel.close();
+ }
+ catch (Throwable ignored)
+ {
+
+ }
+ }
+ }
+
+ /**
+ * @return
+ * @throws FileNotFoundException
+ * @throws IOException
+ */
+ private FileChannel newChannel() throws FileNotFoundException, IOException
+ {
+ RandomAccessFile randomFile = new RandomAccessFile(getFile(), "rw");
+ randomFile.seek(0);
+
+ FileChannel channel = randomFile.getChannel();
+ return channel;
+ }
+
+ @Override
+ public void setBody(final MessagingBuffer body)
+ {
+
+ throw new RuntimeException("Not supported");
+ }
+
+ public synchronized FileChannel getChannel() throws Exception
+ {
+ if (currentChannel == null)
+ {
+ currentChannel = newChannel();
+ }
+
+ return currentChannel;
+ }
+
+ public void closeChannel() throws Exception
+ {
+ if (currentChannel != null)
+ {
+ currentChannel.close();
+ currentChannel = null;
+ }
+
+ }
+
+ @Override
+ public int getBodySize()
+ {
+ return (int)file.length();
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java
===================================================================
--- branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java 2008-10-14 15:45:22 UTC (rev 5113)
+++ branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java 2008-10-14 22:36:10 UTC (rev 5114)
@@ -22,6 +22,7 @@
package org.jboss.messaging.tests.integration.chunkmessage;
+import java.io.File;
import java.nio.ByteBuffer;
import org.jboss.messaging.core.client.ClientConsumer;
@@ -59,91 +60,117 @@
// Public --------------------------------------------------------
- public void testMessageChunk() throws Exception
+ public void testMessageChunkNullPersistence() throws Exception
{
- ClientSessionFactory sf = createInVMFactory();
+ testInternal(false);
+ }
- ClientSession session = sf.createSession(false, true, true, false);
+ public void testMessageChunkFilePersistence() throws Exception
+ {
+ testInternal(true);
+ }
- session.createQueue(ADDRESS, ADDRESS, null, true, false);
+ public void testInternal(final boolean realFiles) throws Exception
+ {
- ClientProducer producer = session.createProducer(ADDRESS);
-
- MessagingBuffer body = new ByteBufferWrapper(ByteBuffer.allocate(DataConstants.SIZE_INT * 150000));
-
- for (int i = 0; i < 15000; i++)
+ clearData();
+
+ messagingService = createService(realFiles);
+ messagingService.start();
+
+ try
{
- body.putInt(i);
- }
- body.flip();
-
- //printBuffer("body to be sent : " , body);
-
- ClientMessage message = session.createClientMessage(true);
+ ClientSessionFactory sf = createInVMFactory();
- message.setBody(body);
- producer.send(message);
-
- session.close();
+ ClientSession session = sf.createSession(false, true, true, false);
- if (this.realFiles)
+ session.createQueue(ADDRESS, ADDRESS, null, true, false);
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ MessagingBuffer body = new ByteBufferWrapper(ByteBuffer.allocate(DataConstants.SIZE_INT * 150000));
+
+ for (int i = 0; i < 15000; i++)
+ {
+ body.putInt(i);
+ }
+ body.flip();
+
+ // printBuffer("body to be sent : " , body);
+
+ ClientMessage message = session.createClientMessage(true);
+
+ message.setBody(body);
+ producer.send(message);
+
+ session.close();
+
+ if (realFiles)
+ {
+ messagingService.stop();
+
+ messagingService = createService(realFiles);
+ messagingService.start();
+
+ sf = createInVMFactory();
+ }
+
+ session = sf.createSession(false, true, true, false);
+
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+ if (realFiles)
+ {
+ consumer.setLargeMessagesAsFiles(true);
+ consumer.setLargeMessagesDir(new File(clientLargeMessagesDir));
+ }
+
+ session.start();
+
+ ClientMessage message2 = consumer.receive(5000);
+
+ assertNotNull(message2);
+
+ System.out.println("msg on client = " + message2.getMessageID());
+
+ // printBuffer("message received : ", message2.getBody());
+
+ assertEqualsByteArrays(body.limit(), body.array(), message2.getBody().array());
+
+ session.close();
+ }
+ finally
{
messagingService.stop();
-
- messagingService = createService(realFiles);
- messagingService.start();
-
- sf = createInVMFactory();
}
-
- session = sf.createSession(false, true, true, false);
-
- ClientConsumer consumer = session.createConsumer(ADDRESS);
-
- session.start();
-
- ClientMessage message2 = consumer.receive(5000);
-
- assertNotNull(message2);
-
- System.out.println("msg on client = " + message2.getMessageID());
-
- //printBuffer("message received : ", message2.getBody());
-
- assertEqualsByteArrays(body.limit(), body.array(), message2.getBody().array());
-
- session.close();
}
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
+ @Override
protected void setUp() throws Exception
{
- this.realFiles = true;
- super.setUp();
}
+ @Override
protected void tearDown() throws Exception
{
- super.tearDown();
}
-
-
+
// Private -------------------------------------------------------
-
- public static void printBuffer(String msg, MessagingBuffer buffer)
+
+ public static void printBuffer(final String msg, final MessagingBuffer buffer)
{
-
+
buffer.rewind();
-
+
int size = buffer.limit();
-
+
System.out.print(msg);
-
-
- for (int i = 0; i < size; i ++)
+
+ for (int i = 0; i < size; i++)
{
System.out.print(String.format("%1$X", buffer.getByte()));
if (i % 40 != 0 || i == 0)
@@ -158,7 +185,6 @@
}
buffer.rewind();
-
}
// Inner classes -------------------------------------------------
Modified: branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java
===================================================================
--- branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java 2008-10-14 15:45:22 UTC (rev 5113)
+++ branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java 2008-10-14 22:36:10 UTC (rev 5114)
@@ -57,8 +57,6 @@
// Attributes ----------------------------------------------------
- protected boolean realFiles = false;
-
protected static final String INVM_ACCEPTOR_FACTORY = InVMAcceptorFactory.class.getCanonicalName();
protected static final String INVM_CONNECTOR_FACTORY = InVMConnectorFactory.class.getCanonicalName();
@@ -74,6 +72,8 @@
protected String pageDir = System.getProperty("java.io.tmpdir", "/tmp") + "/unit-test/page";
protected String largeMessagesDir = System.getProperty("java.io.tmpdir", "/tmp") + "/unit-test/large-msg";
+
+ protected String clientLargeMessagesDir = System.getProperty("java.io.tmpdir", "/tmp") + "/unit-test/client-large-msg";
protected MessagingService messagingService;
@@ -87,29 +87,14 @@
// Protected -----------------------------------------------------
- @Override
- protected void setUp() throws Exception
- {
- if (realFiles)
- {
- clearData();
- }
- messagingService = createService(realFiles);
- messagingService.start();
- }
- @Override
- protected void tearDown() throws Exception
- {
- messagingService.stop();
- }
-
protected void clearData()
{
File file = new File(journalDir);
File file2 = new File(bindingsDir);
File file3 = new File(pageDir);
File file4 = new File(largeMessagesDir);
+ File file5 = new File(clientLargeMessagesDir);
deleteDirectory(file);
file.mkdirs();
deleteDirectory(file2);
@@ -118,6 +103,8 @@
file3.mkdirs();
deleteDirectory(file4);
file4.mkdirs();
+ deleteDirectory(file5);
+ file5.mkdirs();
}
protected MessagingService createService(final boolean realFiles,
@@ -165,6 +152,7 @@
Configuration configuration = new ConfigurationImpl();
configuration.setSecurityEnabled(false);
configuration.setJournalMinFiles(2);
+ configuration.setJournalFileSize(100*1024);
configuration.setPagingDirectory(pageDir);
return configuration;
More information about the jboss-cvs-commits
mailing list