[jboss-cvs] JBoss Messaging SVN: r5418 - in trunk: src/main/org/jboss/messaging/core/client/impl and 4 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Nov 21 12:28:00 EST 2008
Author: timfox
Date: 2008-11-21 12:28:00 -0500 (Fri, 21 Nov 2008)
New Revision: 5418
Added:
trunk/src/main/org/jboss/messaging/core/client/ClientFileMessage.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientFileMessageImpl.java
Removed:
trunk/src/main/org/jboss/messaging/core/client/FileClientMessage.java
trunk/src/main/org/jboss/messaging/core/client/impl/FileClientMessageImpl.java
Modified:
trunk/src/main/org/jboss/messaging/core/client/ClientSession.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalLargeMessageImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendChunkMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendMessage.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/ChunkTestBase.java
trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java
Log:
Some changes on large message support
Copied: trunk/src/main/org/jboss/messaging/core/client/ClientFileMessage.java (from rev 5412, trunk/src/main/org/jboss/messaging/core/client/FileClientMessage.java)
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ClientFileMessage.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/client/ClientFileMessage.java 2008-11-21 17:28:00 UTC (rev 5418)
@@ -0,0 +1,48 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.client;
+
+import java.io.File;
+import java.nio.channels.FileChannel;
+
+import org.jboss.messaging.core.exception.MessagingException;
+
+/**
+ * A ClientFileMessage
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ * Created Oct 14, 2008 3:21:15 PM
+ *
+ *
+ */
+public interface ClientFileMessage extends ClientMessage
+{
+ File getFile();
+
+ void setFile(File file);
+
+ FileChannel getChannel() throws MessagingException;
+
+ void closeChannel() throws MessagingException;
+}
Modified: trunk/src/main/org/jboss/messaging/core/client/ClientSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ClientSession.java 2008-11-21 15:24:58 UTC (rev 5417)
+++ trunk/src/main/org/jboss/messaging/core/client/ClientSession.java 2008-11-21 17:28:00 UTC (rev 5418)
@@ -124,7 +124,7 @@
ClientMessage createClientMessage(final boolean durable);
- FileClientMessage createFileMessage(final boolean durable);
+ ClientFileMessage createFileMessage(final boolean durable);
void start() throws MessagingException;
Deleted: trunk/src/main/org/jboss/messaging/core/client/FileClientMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/FileClientMessage.java 2008-11-21 15:24:58 UTC (rev 5417)
+++ trunk/src/main/org/jboss/messaging/core/client/FileClientMessage.java 2008-11-21 17:28:00 UTC (rev 5418)
@@ -1,50 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.core.client;
-
-import java.io.File;
-import java.nio.channels.FileChannel;
-
-import org.jboss.messaging.core.exception.MessagingException;
-
-/**
- * A FileClientMessage
- *
- * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
- *
- * Created Oct 14, 2008 3:21:15 PM
- *
- *
- */
-public interface FileClientMessage extends ClientMessage
-{
- File getFile();
-
- void setFile(File file);
-
- FileChannel getChannel() throws MessagingException;
-
- void closeChannel() throws MessagingException;
-
-
-}
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2008-11-21 15:24:58 UTC (rev 5417)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2008-11-21 17:28:00 UTC (rev 5418)
@@ -20,7 +20,7 @@
import java.util.concurrent.Executor;
import org.jboss.messaging.core.client.ClientMessage;
-import org.jboss.messaging.core.client.FileClientMessage;
+import org.jboss.messaging.core.client.ClientFileMessage;
import org.jboss.messaging.core.client.MessageHandler;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
@@ -73,7 +73,6 @@
private File directory;
private ClientMessage currentChunkMessage;
-
private volatile Thread receiverThread;
@@ -217,32 +216,7 @@
receiverThread = null;
}
}
-
-
- public ClientMessage createFileMessage(MessagingBuffer propertiesBuffer) throws Exception
- {
- if (isFileConsumer())
- {
- if (!this.directory.exists())
- {
- directory.mkdirs();
- }
-
- FileClientMessageImpl message = new FileClientMessageImpl();
- message.decodeProperties(propertiesBuffer);
- message.setFile(new File(this.directory, message.getMessageID() + "-" + this.session.getName() + "-" + this.getID() + ".jbm"));
- message.setLargeMessage(true);
- return message;
- }
- else
- {
- ClientMessageImpl message = new ClientMessageImpl();
- message.decodeProperties(propertiesBuffer);
- message.setLargeMessage(true);
- return message;
- }
- }
-
+
public ClientMessage receive() throws MessagingException
{
return receive(0);
@@ -338,8 +312,7 @@
}
ClientMessage messageToHandle = message;
-
-
+
if (isFileConsumer())
{
messageToHandle = cloneAsFileMessage(message);
@@ -372,19 +345,17 @@
flowControl(chunk.getBody().length);
-
if (chunk.getHeader() != null)
{
-
// The Header only comes on the first message, so a buffer has to be created on the client
// to hold either a file or a big message
MessagingBuffer header = new ByteBufferWrapper(ByteBuffer.wrap(chunk.getHeader()));
currentChunkMessage = createFileMessage(header);
- if (currentChunkMessage instanceof FileClientMessage)
+ if (currentChunkMessage instanceof ClientFileMessage)
{
- FileClientMessage fileMessage = (FileClientMessage)currentChunkMessage;
+ ClientFileMessage fileMessage = (ClientFileMessage)currentChunkMessage;
addBytesBody(fileMessage, chunk.getBody());
}
else
@@ -398,9 +369,9 @@
// No header.. this is then a continuation of a previous message
ByteBuffer body = ByteBuffer.wrap(chunk.getBody());
- if (currentChunkMessage instanceof FileClientMessage)
+ if (currentChunkMessage instanceof ClientFileMessage)
{
- FileClientMessage fileMessage = (FileClientMessage)currentChunkMessage;
+ ClientFileMessage fileMessage = (ClientFileMessage)currentChunkMessage;
addBytesBody(fileMessage, chunk.getBody());
}
else
@@ -419,18 +390,17 @@
if (!chunk.isContinues())
{
// Close the file that was being generated
- if (currentChunkMessage instanceof FileClientMessage)
+ if (currentChunkMessage instanceof ClientFileMessage)
{
- ((FileClientMessage)currentChunkMessage).closeChannel();
+ ((ClientFileMessage)currentChunkMessage).closeChannel();
}
+
ClientMessage msgToSend = currentChunkMessage;
currentChunkMessage = null;
handleMessage(msgToSend);
- }
-
+ }
}
-
public void clear()
{
synchronized (this)
@@ -642,19 +612,19 @@
session.acknowledge(id, message.getMessageID());
}
-
- private FileClientMessage cloneAsFileMessage(ClientMessage message) throws Exception
+ private ClientFileMessage cloneAsFileMessage(final ClientMessage message) throws Exception
{
int propertiesSize = message.getPropertiesEncodeSize();
+
MessagingBuffer bufferProperties = message.getBody().createNewBuffer(propertiesSize);
- // FIXME: Find a better way to clone this ClientMessageImpl as FileClientMessageImpl without using the MessagingBuffer.
+ // FIXME: Find a better way to clone this ClientMessageImpl as ClientFileMessageImpl without using the MessagingBuffer.
// There is no direct access into the Properties, and I couldn't add a direct cast to this method without loose abstraction
message.encodeProperties(bufferProperties);
bufferProperties.rewind();
- FileClientMessageImpl cloneMessage = new FileClientMessageImpl();
+ ClientFileMessageImpl cloneMessage = new ClientFileMessageImpl();
cloneMessage.decodeProperties(bufferProperties);
@@ -667,15 +637,35 @@
addBytesBody(cloneMessage, message.getBody().array());
cloneMessage.closeChannel();
-
-
+
return cloneMessage;
}
+ private ClientMessage createFileMessage(final MessagingBuffer propertiesBuffer) throws Exception
+ {
+ if (isFileConsumer())
+ {
+ if (!this.directory.exists())
+ {
+ directory.mkdirs();
+ }
+
+ ClientFileMessageImpl message = new ClientFileMessageImpl();
+ message.decodeProperties(propertiesBuffer);
+ message.setFile(new File(this.directory, message.getMessageID() + "-" + this.session.getName() + "-" + this.getID() + ".jbm"));
+ message.setLargeMessage(true);
+ return message;
+ }
+ else
+ {
+ ClientMessageImpl message = new ClientMessageImpl();
+ message.decodeProperties(propertiesBuffer);
+ message.setLargeMessage(true);
+ return message;
+ }
+ }
-
-
- private void addBytesBody(FileClientMessage fileMessage, byte[] body) throws Exception
+ private void addBytesBody(final ClientFileMessage fileMessage, final byte[] body) throws Exception
{
FileChannel channel = fileMessage.getChannel();
channel.write(ByteBuffer.wrap(body));
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java 2008-11-21 15:24:58 UTC (rev 5417)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java 2008-11-21 17:28:00 UTC (rev 5418)
@@ -26,7 +26,6 @@
import org.jboss.messaging.core.client.ClientMessage;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendChunkMessage;
-import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
/**
*
@@ -53,8 +52,6 @@
void cleanUp() throws Exception;
- ClientMessage createFileMessage(MessagingBuffer propertiesBuffer) throws Exception;
-
void acknowledge(ClientMessage message) throws MessagingException;
void flushAcks() throws MessagingException;
Copied: trunk/src/main/org/jboss/messaging/core/client/impl/ClientFileMessageImpl.java (from rev 5412, trunk/src/main/org/jboss/messaging/core/client/impl/FileClientMessageImpl.java)
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientFileMessageImpl.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientFileMessageImpl.java 2008-11-21 17:28:00 UTC (rev 5418)
@@ -0,0 +1,231 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.client.impl;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+import org.jboss.messaging.core.client.ClientFileMessage;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+
+/**
+ * A ClientFileMessageImpl
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ * Created Oct 13, 2008 4:33:56 PM
+ *
+ *
+ */
+public class ClientFileMessageImpl extends ClientMessageImpl implements ClientFileMessage
+{
+ private File file;
+
+ private FileChannel currentChannel;
+
+ public ClientFileMessageImpl()
+ {
+ }
+
+ public ClientFileMessageImpl(final boolean durable)
+ {
+ super(durable, null);
+ }
+
+ /**
+ * @param type
+ * @param durable
+ * @param expiration
+ * @param timestamp
+ * @param priority
+ * @param body
+ */
+ public ClientFileMessageImpl(final byte type,
+ final boolean durable,
+ final long expiration,
+ final long timestamp,
+ final byte priority,
+ final MessagingBuffer body)
+ {
+ super(type, durable, expiration, timestamp, priority, body);
+ }
+
+ /**
+ * @param type
+ * @param durable
+ * @param body
+ */
+ public ClientFileMessageImpl(final byte type, final boolean durable, final MessagingBuffer body)
+ {
+ super(type, durable, body);
+ }
+
+ /**
+ * @param deliveryCount
+ */
+ public ClientFileMessageImpl(final int deliveryCount)
+ {
+ super(deliveryCount);
+ }
+
+ /**
+ * @return the file
+ */
+ public File getFile()
+ {
+ return file;
+ }
+
+ /**
+ * @param file the file to set
+ */
+ public void setFile(final File file)
+ {
+ this.file = file;
+ }
+
+ @Override
+ public MessagingBuffer getBody()
+ {
+ // TODO: Throw an unsuported exception. (Make sure no tests are using this method first)
+
+ FileChannel channel = null;
+ try
+ {
+ // We open a new channel on getBody.
+ // for a better performance, users should be using the channels when using file
+ channel = newChannel();
+
+ ByteBuffer buffer = ByteBuffer.allocate((int)channel.size());
+
+ channel.position(0);
+ channel.read(buffer);
+
+ return new ByteBufferWrapper(buffer);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ finally
+ {
+ try
+ {
+ channel.close();
+ }
+ catch (Throwable ignored)
+ {
+
+ }
+ }
+ }
+
+ @Override
+ public synchronized void encodeBody(final MessagingBuffer buffer, final long start, final int size)
+ {
+ try
+ {
+ FileChannel channel = getChannel();
+
+ ByteBuffer bufferRead = ByteBuffer.allocate(size);
+
+ channel.position(start);
+ channel.read(bufferRead);
+
+ buffer.putBytes(bufferRead.array());
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public void setBody(final MessagingBuffer body)
+ {
+ throw new RuntimeException("Not supported");
+ }
+
+ public synchronized FileChannel getChannel() throws MessagingException
+ {
+ if (currentChannel == null)
+ {
+ currentChannel = newChannel();
+ }
+
+ return currentChannel;
+ }
+
+ public synchronized void closeChannel() throws MessagingException
+ {
+ if (currentChannel != null)
+ {
+ try
+ {
+ currentChannel.close();
+ }
+ catch (IOException e)
+ {
+ throw new MessagingException(MessagingException.INTERNAL_ERROR, e.getMessage(), e);
+ }
+ currentChannel = null;
+ }
+
+ }
+
+ @Override
+ public synchronized int getBodySize()
+ {
+ return (int)file.length();
+ }
+
+ /**
+ * @return
+ * @throws FileNotFoundException
+ * @throws IOException
+ */
+ private FileChannel newChannel() throws MessagingException
+ {
+ try
+ {
+ RandomAccessFile randomFile = new RandomAccessFile(getFile(), "rw");
+
+ randomFile.seek(0);
+
+ FileChannel channel = randomFile.getChannel();
+
+ return channel;
+ }
+ catch (IOException e)
+ {
+ throw new MessagingException(MessagingException.INTERNAL_ERROR, e.getMessage(), e);
+ }
+ }
+
+}
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java 2008-11-21 15:24:58 UTC (rev 5417)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java 2008-11-21 17:28:00 UTC (rev 5418)
@@ -25,7 +25,7 @@
import java.nio.ByteBuffer;
import org.jboss.messaging.core.client.AcknowledgementHandler;
-import org.jboss.messaging.core.client.FileClientMessage;
+import org.jboss.messaging.core.client.ClientFileMessage;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.message.Message;
@@ -74,14 +74,14 @@
private final boolean blockOnPersistentSend;
private final SimpleString groupID;
-
+
private final int minLargeMessageSize;
// Static ---------------------------------------------------------------------------------------
// Constructors ---------------------------------------------------------------------------------
- public ClientProducerImpl(final ClientSessionInternal session,
+ public ClientProducerImpl(final ClientSessionInternal session,
final SimpleString address,
final TokenBucketLimiter rateLimiter,
final boolean blockOnNonPersistentSend,
@@ -93,13 +93,13 @@
this.channel = channel;
this.session = session;
-
+
this.address = address;
this.rateLimiter = rateLimiter;
this.blockOnNonPersistentSend = blockOnNonPersistentSend;
-
+
this.blockOnPersistentSend = blockOnPersistentSend;
if (autoGroup)
@@ -110,9 +110,8 @@
{
this.groupID = null;
}
-
- this.minLargeMessageSize = minLargeMessageSize;
+ this.minLargeMessageSize = minLargeMessageSize;
}
// ClientProducer implementation ----------------------------------------------------------------
@@ -153,7 +152,7 @@
return;
}
- doCleanup();
+ doCleanup();
}
public void cleanUp()
@@ -225,15 +224,14 @@
}
boolean sendBlocking = msg.isDurable() ? blockOnPersistentSend : blockOnNonPersistentSend;
-
+
SessionSendMessage message = new SessionSendMessage(msg, sendBlocking);
if (msg.getEncodeSize() > minLargeMessageSize)
{
sendMessageInChunks(true, msg);
}
- else
- if (sendBlocking)
+ else if (sendBlocking)
{
channel.sendBlocking(message);
}
@@ -254,7 +252,7 @@
if (headerSize > minLargeMessageSize)
{
throw new MessagingException(MessagingException.ILLEGAL_STATE,
- "Header size is too big, use the messageBody for large data");
+ "Header size is too big, use the messageBody for large data, or increase minLargeMessageSize");
}
MessagingBuffer headerBuffer = new ByteBufferWrapper(ByteBuffer.allocate(headerSize));
@@ -262,16 +260,16 @@
final int bodySize = msg.getBodySize();
- int bodyLength = minLargeMessageSize - headerSize;
+ int chunkLength = minLargeMessageSize - headerSize;
- MessagingBuffer bodyBuffer = new ByteBufferWrapper(ByteBuffer.allocate(bodyLength));
+ MessagingBuffer bodyBuffer = new ByteBufferWrapper(ByteBuffer.allocate(chunkLength));
- msg.encodeBody(bodyBuffer, 0, bodyLength);
+ msg.encodeBody(bodyBuffer, 0, chunkLength);
SessionSendChunkMessage chunk = new SessionSendChunkMessage(-1,
headerBuffer.array(),
bodyBuffer.array(),
- bodyLength < bodySize,
+ chunkLength < bodySize,
sendBlocking);
if (sendBlocking)
@@ -283,14 +281,15 @@
channel.send(chunk);
}
- for (int pos = bodyLength; pos < bodySize;)
+ for (int pos = chunkLength; pos < bodySize;)
{
- bodyLength = Math.min(bodySize - pos, minLargeMessageSize);
- bodyBuffer = new ByteBufferWrapper(ByteBuffer.allocate(bodyLength));
+ chunkLength = Math.min(bodySize - pos, minLargeMessageSize);
+
+ bodyBuffer = new ByteBufferWrapper(ByteBuffer.allocate(chunkLength));
- msg.encodeBody(bodyBuffer, pos, bodyLength);
+ msg.encodeBody(bodyBuffer, pos, chunkLength);
- chunk = new SessionSendChunkMessage(-1, null, bodyBuffer.array(), pos + bodyLength < bodySize, sendBlocking);
+ chunk = new SessionSendChunkMessage(-1, null, bodyBuffer.array(), pos + chunkLength < bodySize, sendBlocking);
if (sendBlocking)
{
@@ -301,14 +300,14 @@
channel.send(chunk);
}
- pos += bodyLength;
+ pos += chunkLength;
}
-
- if (msg instanceof FileClientMessage)
+
+ if (msg instanceof ClientFileMessage)
{
try
{
- ((FileClientMessage)msg).closeChannel();
+ ((ClientFileMessage)msg).closeChannel();
}
catch (Exception e)
{
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2008-11-21 15:24:58 UTC (rev 5417)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2008-11-21 17:28:00 UTC (rev 5418)
@@ -37,7 +37,7 @@
import org.jboss.messaging.core.client.ClientConsumer;
import org.jboss.messaging.core.client.ClientMessage;
import org.jboss.messaging.core.client.ClientProducer;
-import org.jboss.messaging.core.client.FileClientMessage;
+import org.jboss.messaging.core.client.ClientFileMessage;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.Channel;
@@ -476,9 +476,9 @@
return new ClientMessageImpl(durable, body);
}
- public FileClientMessage createFileMessage(final boolean durable)
+ public ClientFileMessage createFileMessage(final boolean durable)
{
- return new FileClientMessageImpl(durable);
+ return new ClientFileMessageImpl(durable);
}
public boolean isClosed()
@@ -622,7 +622,6 @@
{
consumer.handleChunk(chunk);
}
-
}
public void close() throws MessagingException
Deleted: trunk/src/main/org/jboss/messaging/core/client/impl/FileClientMessageImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/FileClientMessageImpl.java 2008-11-21 15:24:58 UTC (rev 5417)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/FileClientMessageImpl.java 2008-11-21 17:28:00 UTC (rev 5418)
@@ -1,256 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.core.client.impl;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-
-import org.jboss.messaging.core.client.ClientMessage;
-import org.jboss.messaging.core.client.FileClientMessage;
-import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.message.Message;
-import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
-import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
-
-/**
- * A FileClientMessageImpl
- *
- * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
- *
- * Created Oct 13, 2008 4:33:56 PM
- *
- *
- */
-public class FileClientMessageImpl extends ClientMessageImpl implements FileClientMessage
-{
-
- File file;
-
- FileChannel currentChannel;
-
- /**
- *
- */
- public FileClientMessageImpl()
- {
- super();
- }
-
- public FileClientMessageImpl(final boolean durable)
- {
- super(durable, null);
- }
-
- /**
- * @param type
- * @param durable
- * @param expiration
- * @param timestamp
- * @param priority
- * @param body
- */
- public FileClientMessageImpl(final byte type,
- final boolean durable,
- final long expiration,
- final long timestamp,
- final byte priority,
- final MessagingBuffer body)
- {
- super(type, durable, expiration, timestamp, priority, body);
- }
-
- /**
- * @param type
- * @param durable
- * @param body
- */
- public FileClientMessageImpl(final byte type, final boolean durable, final MessagingBuffer body)
- {
- super(type, durable, body);
- }
-
- /**
- * @param deliveryCount
- */
- public FileClientMessageImpl(final int deliveryCount)
- {
- super(deliveryCount);
- }
-
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- /**
- * @return the file
- */
- public File getFile()
- {
- return file;
- }
-
- /**
- * @param file the file to set
- */
- public void setFile(final File file)
- {
- this.file = file;
- }
-
- @Override
- public MessagingBuffer getBody()
- {
- // TODO: Throw an unsuported exception. (Make sure no tests are using this method first)
-
- FileChannel channel = null;
- try
- {
- // We open a new channel on getBody.
- // for a better performance, users should be using the channels when using file
- channel = newChannel();
-
- ByteBuffer buffer = ByteBuffer.allocate((int)channel.size());
-
- channel.position(0);
- channel.read(buffer);
-
- return new ByteBufferWrapper(buffer);
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
- finally
- {
- try
- {
- channel.close();
- }
- catch (Throwable ignored)
- {
-
- }
- }
- }
-
- @Override
- public synchronized void encodeBody(final MessagingBuffer buffer, final long start, final int size)
- {
- try
- {
- FileChannel channel = getChannel();
-
- ByteBuffer bufferRead = ByteBuffer.allocate(size);
-
- channel.position(start);
- channel.read(bufferRead);
-
- buffer.putBytes(bufferRead.array());
- }
- catch (Exception e)
- {
- throw new RuntimeException(e.getMessage(), e);
- }
-
- }
-
- @Override
- public void setBody(final MessagingBuffer body)
- {
-
- throw new RuntimeException("Not supported");
- }
-
- public synchronized FileChannel getChannel() throws MessagingException
- {
- if (currentChannel == null)
- {
- currentChannel = newChannel();
- }
-
- return currentChannel;
- }
-
- public synchronized void closeChannel() throws MessagingException
- {
- if (currentChannel != null)
- {
- try
- {
- currentChannel.close();
- }
- catch (IOException e)
- {
- throw new MessagingException(MessagingException.INTERNAL_ERROR, e.getMessage(), e);
- }
- currentChannel = null;
- }
-
- }
-
- @Override
- public synchronized int getBodySize()
- {
- return (int)file.length();
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- /**
- * @return
- * @throws FileNotFoundException
- * @throws IOException
- */
- private FileChannel newChannel() throws MessagingException
- {
- try
- {
- RandomAccessFile randomFile = new RandomAccessFile(getFile(), "rw");
- randomFile.seek(0);
-
- FileChannel channel = randomFile.getChannel();
- return channel;
- }
- catch (IOException e)
- {
- throw new MessagingException(MessagingException.INTERNAL_ERROR, e.getMessage(), e);
- }
- }
-
- // Inner classes -------------------------------------------------
-
-}
Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalLargeMessageImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalLargeMessageImpl.java 2008-11-21 15:24:58 UTC (rev 5417)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalLargeMessageImpl.java 2008-11-21 17:28:00 UTC (rev 5418)
@@ -84,7 +84,6 @@
file.position(file.size());
file.write(ByteBuffer.wrap(bytes), false);
-
}
@Override
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendChunkMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendChunkMessage.java 2008-11-21 15:24:58 UTC (rev 5417)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendChunkMessage.java 2008-11-21 17:28:00 UTC (rev 5418)
@@ -44,7 +44,7 @@
private boolean continues;
- private long messageID = 0;
+ private long messageID;
private boolean requiresResponse;
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendMessage.java 2008-11-21 15:24:58 UTC (rev 5417)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendMessage.java 2008-11-21 17:28:00 UTC (rev 5418)
@@ -86,7 +86,6 @@
public void encodeBody(final MessagingBuffer buffer)
{
-
if (clientMessage != null)
{
clientMessage.encode(buffer);
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2008-11-21 15:24:58 UTC (rev 5417)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2008-11-21 17:28:00 UTC (rev 5418)
@@ -33,7 +33,6 @@
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.filter.Filter;
import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.paging.PagingManager;
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.PostOffice;
import org.jboss.messaging.core.remoting.Channel;
@@ -88,7 +87,7 @@
private final Filter filter;
private final int minLargeMessageSize;
-
+
private final ServerSession session;
private final Lock lock = new ReentrantLock();
@@ -96,9 +95,9 @@
private AtomicInteger availableCredits = new AtomicInteger(0);
private boolean started;
-
- private volatile LargeMessageControl largeMessage = null;
+ private volatile LargeMessageSender largeMessageSender = null;
+
/**
* if we are a browse only consumer we don't need to worry about acknowledgemenets or being started/stopeed by the session.
*/
@@ -111,12 +110,8 @@
private final PostOffice postOffice;
private final java.util.Queue<MessageReference> deliveringRefs = new ConcurrentLinkedQueue<MessageReference>();
-
-
private final Channel channel;
-
- private final PagingManager pager;
private volatile boolean closed;
@@ -135,7 +130,6 @@
final HierarchicalRepository<QueueSettings> queueSettingsRepository,
final PostOffice postOffice,
final Channel channel,
- final PagingManager pager,
final boolean preCommitAcks)
{
this.id = id;
@@ -149,7 +143,7 @@
this.started = browseOnly || started;
this.browseOnly = browseOnly;
-
+
this.storageManager = storageManager;
this.queueSettingsRepository = queueSettingsRepository;
@@ -157,13 +151,11 @@
this.postOffice = postOffice;
this.channel = channel;
-
- this.pager = pager;
this.preCommitAcks = preCommitAcks;
messageQueue.addConsumer(this);
-
+
this.minLargeMessageSize = session.getMinLargeMessageSize();
}
@@ -317,16 +309,16 @@
}
public void receiveCredits(final int credits) throws Exception
- {
+ {
if (credits == -1)
{
- //No flow control
+ // No flow control
availableCredits = null;
}
else
{
int previous = availableCredits.getAndAdd(credits);
-
+
if (previous <= 0 && previous + credits > 0)
{
promptDelivery();
@@ -339,8 +331,7 @@
return messageQueue;
}
- public void acknowledge(final boolean autoCommitAcks, final Transaction tx, final long messageID)
- throws Exception
+ public void acknowledge(final boolean autoCommitAcks, final Transaction tx, final long messageID) throws Exception
{
if (browseOnly)
{
@@ -375,7 +366,7 @@
// Del count is not actually updated in storage unless it's
// cancelled
ref.incrementDeliveryCount();
- }
+ }
}
while (ref.getMessage().getMessageID() != messageID);
@@ -387,27 +378,27 @@
{
return null;
}
-
- //Expiries can come in our of sequence with respect to delivery order
-
+
+ // Expiries can come in our of sequence with respect to delivery order
+
Iterator<MessageReference> iter = deliveringRefs.iterator();
-
+
MessageReference ref = null;
-
+
while (iter.hasNext())
{
MessageReference theRef = iter.next();
-
+
if (theRef.getMessage().getMessageID() == messageID)
{
iter.remove();
-
+
ref = theRef;
-
+
break;
}
}
-
+
if (ref == null)
{
throw new IllegalStateException("Could not find reference with id " + messageID +
@@ -416,7 +407,7 @@
" closed " +
closed);
}
-
+
return ref;
}
@@ -492,12 +483,12 @@
queue.referenceAcknowledged(ref);
}
-
+
private void promptDelivery()
{
- if (largeMessage != null)
+ if (largeMessageSender != null)
{
- if (largeMessage.sendLargeMessage())
+ if (largeMessageSender.sendLargeMessage())
{
// prompt Delivery only if chunk was finished
session.promptDelivery(messageQueue);
@@ -522,7 +513,7 @@
{
// If there is a pendingLargeMessage we can't take another message
// This has to be checked inside the lock as the set to null is done inside the lock
- if (largeMessage != null)
+ if (largeMessageSender != null)
{
return HandleStatus.BUSY;
}
@@ -547,19 +538,19 @@
deliveringRefs.add(ref);
}
-
if (message instanceof ServerLargeMessage)
{
// TODO: How to inform the backup node about the LargeMessage being sent?
- largeMessage = new LargeMessageControl((ServerLargeMessage)message);
- largeMessage.sendLargeMessage();
+ largeMessageSender = new LargeMessageSender((ServerLargeMessage)message);
+
+ largeMessageSender.sendLargeMessage();
}
else
{
- sendRegularMessage(ref, message);
+ sendStandardMessage(ref, message);
}
- if(preCommitAcks)
+ if (preCommitAcks)
{
doAck(ref);
}
@@ -576,7 +567,7 @@
* @param ref
* @param message
*/
- private void sendRegularMessage(final MessageReference ref, final ServerMessage message)
+ private void sendStandardMessage(final MessageReference ref, final ServerMessage message)
{
if (availableCredits != null)
{
@@ -605,45 +596,36 @@
}
}
-
-
-
-
-
// Inner classes
// ------------------------------------------------------------------------
-
-
+
/** Internal encapsulation of the logic on sending LargeMessages.
* This Inner class was created to avoid a bunch of loose properties about the current LargeMessage being sent*/
- class LargeMessageControl
+ private class LargeMessageSender
{
- private volatile long sizePendingLargeMessage;
+ private long sizePendingLargeMessage;
/** The current message being processed */
- private volatile ServerLargeMessage pendingLargeMessage;
-
+ private ServerLargeMessage pendingLargeMessage;
+
/** The current position on the message being processed */
- private volatile long positionPendingLargeMessage;
-
- private SessionSendChunkMessage readAheadChunk = null;
-
- public LargeMessageControl(ServerLargeMessage message)
+ private long positionPendingLargeMessage;
+
+ private SessionSendChunkMessage readAheadChunk;
+
+ public LargeMessageSender(final ServerLargeMessage message)
{
pendingLargeMessage = (ServerLargeMessage)message;
- positionPendingLargeMessage = 0;
+
sizePendingLargeMessage = pendingLargeMessage.getBodySize();
}
-
-
+
public boolean sendLargeMessage()
{
-
lock.lock();
try
{
-
if (pendingLargeMessage == null)
{
return true;
@@ -657,18 +639,21 @@
if (readAheadChunk != null)
{
int chunkLen = readAheadChunk.getBody().length;
+
positionPendingLargeMessage += chunkLen;
+
channel.send(readAheadChunk);
+
readAheadChunk = null;
+
if (availableCredits != null)
{
availableCredits.addAndGet(-chunkLen);
}
}
-
+
while (positionPendingLargeMessage < sizePendingLargeMessage)
{
-
if (availableCredits != null && availableCredits.get() <= 0)
{
if (readAheadChunk == null)
@@ -677,41 +662,39 @@
}
return false;
}
-
+
SessionSendChunkMessage chunk = createChunkSend();
-
+
int chunkLen = chunk.getBody().length;
if (availableCredits != null)
{
availableCredits.addAndGet(-chunkLen);
}
-
+
channel.send(chunk);
-
+
positionPendingLargeMessage += chunkLen;
}
-
+
pendingLargeMessage.releaseResources();
- ServerConsumerImpl.this.largeMessage = null;
-
+ ServerConsumerImpl.this.largeMessageSender = null;
+
return true;
}
finally
{
lock.unlock();
}
-
-
}
private SessionSendChunkMessage createChunkSend()
{
SessionSendChunkMessage chunk;
-
+
int localChunkLen = 0;
-
+
if (positionPendingLargeMessage == 0)
{
int headerSize = pendingLargeMessage.getPropertiesEncodeSize();
@@ -724,7 +707,6 @@
MessagingBuffer bodyBuffer = new ByteBufferWrapper(ByteBuffer.allocate((int)localChunkLen));
pendingLargeMessage.encodeBody(bodyBuffer, 0, localChunkLen);
-
chunk = new SessionSendChunkMessage(id,
headerBuffer.array(),
bodyBuffer.array(),
@@ -745,11 +727,8 @@
positionPendingLargeMessage + localChunkLen < sizePendingLargeMessage,
false);
}
-
+
return chunk;
-
}
-
}
-
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-11-21 15:24:58 UTC (rev 5417)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-11-21 17:28:00 UTC (rev 5418)
@@ -200,7 +200,7 @@
private final SimpleString managementAddress;
- private ServerLargeMessage largeMessage;
+ private volatile ServerLargeMessage largeMessage;
// Constructors ---------------------------------------------------------------------------------
@@ -324,8 +324,7 @@
}
catch (Throwable error)
{
- log.warn(error.toString(), error);
-
+ log.error("Failed to delete large message file", error);
}
}
@@ -394,8 +393,7 @@
storageManager,
queueSettingsRepository,
postOffice,
- channel,
- pager,
+ channel,
preCommitAcks);
consumers.put(consumer.getID(), consumer);
@@ -2164,7 +2162,7 @@
{
if (packet.getHeader() != null)
{
- largeMessage = createLargeMessageStorage(packet.getTargetID(), packet.getMessageID(), packet.getHeader());
+ largeMessage = createLargeMessageStorage(packet.getMessageID(), packet.getHeader());
}
largeMessage.addBytes(packet.getBody());
@@ -2172,9 +2170,11 @@
if (!packet.isContinues())
{
final ServerLargeMessage message = largeMessage;
+
largeMessage = null;
message.complete();
+
send(message);
}
@@ -2182,7 +2182,6 @@
{
response = new NullResponseMessage();
}
-
}
catch (Exception e)
{
@@ -2409,7 +2408,7 @@
// Private
// ----------------------------------------------------------------------------
- private ServerLargeMessage createLargeMessageStorage(long producerID, long messageID, byte[] header) throws Exception
+ private ServerLargeMessage createLargeMessageStorage(long messageID, byte[] header) throws Exception
{
ServerLargeMessage largeMessage = storageManager.createLargeMessage();
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/ChunkTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/ChunkTestBase.java 2008-11-21 15:24:58 UTC (rev 5417)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/ChunkTestBase.java 2008-11-21 17:28:00 UTC (rev 5418)
@@ -34,7 +34,7 @@
import org.jboss.messaging.core.client.ClientProducer;
import org.jboss.messaging.core.client.ClientSession;
import org.jboss.messaging.core.client.ClientSessionFactory;
-import org.jboss.messaging.core.client.FileClientMessage;
+import org.jboss.messaging.core.client.ClientFileMessage;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.message.impl.MessageImpl;
@@ -137,7 +137,7 @@
for (int i = 0; i < numberOfMessages; i++)
{
ClientMessage message = session.createFileMessage(true);
- ((FileClientMessage)message).setFile(tmpData);
+ ((ClientFileMessage)message).setFile(tmpData);
message.putIntProperty(new SimpleString("counter-message"), i);
long timeStart = System.currentTimeMillis();
if (delayDelivery > 0)
@@ -223,7 +223,7 @@
if (realFiles)
{
- assertTrue (message instanceof FileClientMessage);
+ assertTrue (message instanceof ClientFileMessage);
}
if (testTime)
@@ -250,9 +250,9 @@
if (!testTime)
{
- if (message instanceof FileClientMessage)
+ if (message instanceof ClientFileMessage)
{
- checkFileRead(((FileClientMessage)message).getFile(), numberOfIntegers);
+ checkFileRead(((ClientFileMessage)message).getFile(), numberOfIntegers);
}
else
{
@@ -298,10 +298,10 @@
}
- protected FileClientMessage createLargeClientMessage(final ClientSession session, final int numberOfIntegers) throws Exception
+ protected ClientFileMessage createLargeClientMessage(final ClientSession session, final int numberOfIntegers) throws Exception
{
- FileClientMessage clientMessage = session.createFileMessage(true);
+ ClientFileMessage clientMessage = session.createFileMessage(true);
File tmpFile = createLargeFile(temporaryDir, "tmpUpload.data", numberOfIntegers);
@@ -374,17 +374,17 @@
assertNotNull(clientMessage);
- if (!(clientMessage instanceof FileClientMessage))
+ if (!(clientMessage instanceof ClientFileMessage))
{
System.out.println("Size = " + clientMessage.getBodySize());
}
- if (clientMessage instanceof FileClientMessage)
+ if (clientMessage instanceof ClientFileMessage)
{
- assertTrue(clientMessage instanceof FileClientMessage);
+ assertTrue(clientMessage instanceof ClientFileMessage);
- FileClientMessage fileClientMessage = (FileClientMessage)clientMessage;
+ ClientFileMessage fileClientMessage = (ClientFileMessage)clientMessage;
assertNotNull(fileClientMessage);
File receivedFile = fileClientMessage.getFile();
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java 2008-11-21 15:24:58 UTC (rev 5417)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java 2008-11-21 17:28:00 UTC (rev 5418)
@@ -34,7 +34,7 @@
import org.jboss.messaging.core.client.ClientProducer;
import org.jboss.messaging.core.client.ClientSession;
import org.jboss.messaging.core.client.ClientSessionFactory;
-import org.jboss.messaging.core.client.FileClientMessage;
+import org.jboss.messaging.core.client.ClientFileMessage;
import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
import org.jboss.messaging.core.client.impl.ClientSessionImpl;
import org.jboss.messaging.core.config.Configuration;
@@ -166,7 +166,7 @@
ClientProducer producer = session.createProducer(ADDRESS);
- FileClientMessage clientLarge = createLargeClientMessage(session, numberOfIntegersBigMessage);
+ ClientFileMessage clientLarge = createLargeClientMessage(session, numberOfIntegersBigMessage);
try
{
@@ -523,7 +523,7 @@
producer.send(message);
}
- FileClientMessage clientFile = createLargeClientMessage(session, numberOfIntegersBigMessage);
+ ClientFileMessage clientFile = createLargeClientMessage(session, numberOfIntegersBigMessage);
producer.send(clientFile);
More information about the jboss-cvs-commits
mailing list