Author: ataylor
Date: 2009-11-03 10:00:57 -0500 (Tue, 03 Nov 2009)
New Revision: 8193
Added:
trunk/src/main/org/hornetq/core/message/LargeMessageEncodingContext.java
Modified:
trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
trunk/src/main/org/hornetq/core/journal/SequentialFile.java
trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java
trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
trunk/src/main/org/hornetq/core/message/Message.java
trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalLargeServerMessage.java
trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
trunk/src/main/org/hornetq/core/server/LargeServerMessage.java
trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
trunk/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-190 - each largemessagedeliverer now uses its
own file handle
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java 2009-11-03
14:32:35 UTC (rev 8192)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java 2009-11-03
15:00:57 UTC (rev 8193)
@@ -17,11 +17,13 @@
import java.io.IOException;
import java.io.InputStream;
+import java.nio.ByteBuffer;
import org.hornetq.core.buffers.ChannelBuffers;
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.message.Message;
+import org.hornetq.core.message.LargeMessageEncodingContext;
import org.hornetq.core.message.impl.MessageImpl;
import org.hornetq.core.remoting.Channel;
import org.hornetq.core.remoting.impl.wireformat.SessionSendContinuationMessage;
@@ -368,6 +370,8 @@
{
final long bodySize = msg.getLargeBodySize();
+ LargeMessageEncodingContext context = new DecodingContext(msg);
+
for (int pos = 0; pos < bodySize;)
{
final boolean lastChunk;
@@ -376,7 +380,7 @@
final HornetQBuffer bodyBuffer = ChannelBuffers.buffer(chunkLength);
- msg.encodeBody(bodyBuffer, pos, chunkLength);
+ msg.encodeBody(bodyBuffer, context, chunkLength);
pos += chunkLength;
@@ -408,5 +412,34 @@
}
// Inner Classes
--------------------------------------------------------------------------------
+ class DecodingContext implements LargeMessageEncodingContext
+ {
+ private final Message message;
+ private int lastPos = 0;
+ public DecodingContext(Message message)
+ {
+ this.message = message;
+ }
+
+ public void open() throws Exception
+ {
+ }
+
+ public void close() throws Exception
+ {
+ }
+
+ public int write(ByteBuffer bufferRead) throws Exception
+ {
+ return -1;
+ }
+
+ public int write(HornetQBuffer bufferOut, int size)
+ {
+ bufferOut.writeBytes(message.getBody(), lastPos, size);
+ lastPos += size;
+ return size;
+ }
+ }
}
Modified: trunk/src/main/org/hornetq/core/journal/SequentialFile.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/SequentialFile.java 2009-11-03 14:32:35 UTC
(rev 8192)
+++ trunk/src/main/org/hornetq/core/journal/SequentialFile.java 2009-11-03 15:00:57 UTC
(rev 8193)
@@ -85,4 +85,5 @@
void enableAutoFlush();
+ SequentialFile copy();
}
Modified: trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java 2009-11-03
14:32:35 UTC (rev 8192)
+++ trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java 2009-11-03
15:00:57 UTC (rev 8193)
@@ -29,6 +29,7 @@
import org.hornetq.core.asyncio.impl.TimedBufferObserver;
import org.hornetq.core.journal.IOCallback;
import org.hornetq.core.journal.SequentialFileFactory;
+import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.spi.HornetQBuffer;
@@ -126,6 +127,11 @@
timedBuffer.enableAutoFlush();
}
+ public SequentialFile copy()
+ {
+ return new AIOSequentialFile(factory, -1, -1, getFile().getParent(), getFileName(),
maxIO, bufferCallback, executor, pollerExecutor);
+ }
+
public synchronized void close() throws Exception
{
if (!opened)
Modified: trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java 2009-11-03
14:32:35 UTC (rev 8192)
+++ trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java 2009-11-03
15:00:57 UTC (rev 8193)
@@ -20,6 +20,7 @@
import java.util.concurrent.atomic.AtomicLong;
import org.hornetq.core.journal.IOCallback;
+import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.spi.HornetQBuffer;
@@ -48,6 +49,11 @@
super(directory, new File(directory + "/" + fileName));
}
+ public NIOSequentialFile(File file)
+ {
+ super(file.getParent(), new File(file.getPath()));
+ }
+
public int getAlignment()
{
return 1;
@@ -265,4 +271,8 @@
{
}
+ public SequentialFile copy()
+ {
+ return new NIOSequentialFile(getFile());
+ }
}
Added: trunk/src/main/org/hornetq/core/message/LargeMessageEncodingContext.java
===================================================================
--- trunk/src/main/org/hornetq/core/message/LargeMessageEncodingContext.java
(rev 0)
+++ trunk/src/main/org/hornetq/core/message/LargeMessageEncodingContext.java 2009-11-03
15:00:57 UTC (rev 8193)
@@ -0,0 +1,20 @@
+package org.hornetq.core.message;
+
+import org.hornetq.core.remoting.spi.HornetQBuffer;
+
+import java.nio.ByteBuffer;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ * Created Nov 2, 2009
+ */
+public interface LargeMessageEncodingContext
+{
+ void open() throws Exception;
+
+ void close() throws Exception;
+
+ int write(ByteBuffer bufferRead) throws Exception;
+
+ int write(HornetQBuffer bufferOut, int size);
+}
Modified: trunk/src/main/org/hornetq/core/message/Message.java
===================================================================
--- trunk/src/main/org/hornetq/core/message/Message.java 2009-11-03 14:32:35 UTC (rev
8192)
+++ trunk/src/main/org/hornetq/core/message/Message.java 2009-11-03 15:00:57 UTC (rev
8193)
@@ -18,6 +18,7 @@
import java.util.Set;
import org.hornetq.core.remoting.spi.HornetQBuffer;
+import org.hornetq.core.message.LargeMessageEncodingContext;
import org.hornetq.utils.SimpleString;
import org.hornetq.utils.TypedProperties;
@@ -81,7 +82,7 @@
long getLargeBodySize();
// Used on Message chunk
- void encodeBody(HornetQBuffer buffer, long start, int size);
+ void encodeBody(final HornetQBuffer bufferOut, LargeMessageEncodingContext context,
int size);
/** Set the InputStream used on a message that will be sent over a producer */
void setBodyInputStream(InputStream stream);
Modified: trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java 2009-11-03 14:32:35 UTC
(rev 8192)
+++ trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java 2009-11-03 15:00:57 UTC
(rev 8193)
@@ -27,6 +27,7 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.core.message.Message;
import org.hornetq.core.remoting.spi.HornetQBuffer;
+import org.hornetq.core.message.LargeMessageEncodingContext;
import org.hornetq.utils.SimpleString;
import org.hornetq.utils.TypedProperties;
@@ -213,10 +214,10 @@
buffer.writeBytes(localBody.array(), 0, localBody.writerIndex());
}
- // Used on Message chunk
- public void encodeBody(HornetQBuffer buffer, long start, int size)
+ // Used on Message chunk side
+ public void encodeBody(final HornetQBuffer bufferOut, LargeMessageEncodingContext
context, int size)
{
- buffer.writeBytes(body, (int)start, size);
+ context.write(bufferOut, size);
}
public void decode(final HornetQBuffer buffer)
Modified:
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalLargeServerMessage.java
===================================================================
---
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalLargeServerMessage.java 2009-11-03
14:32:35 UTC (rev 8192)
+++
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalLargeServerMessage.java 2009-11-03
15:00:57 UTC (rev 8193)
@@ -25,6 +25,7 @@
import org.hornetq.core.server.LargeServerMessage;
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.message.LargeMessageEncodingContext;
import org.hornetq.core.server.impl.ServerMessageImpl;
/**
@@ -105,25 +106,15 @@
bodySize += bytes.length;
}
- @Override
- public synchronized void encodeBody(final HornetQBuffer bufferOut, final long start,
final int size)
+ public void encodeBody(final HornetQBuffer bufferOut, LargeMessageEncodingContext
context, int size)
{
try
{
- validateFile();
-
// This could maybe be optimized (maybe reading directly into bufferOut)
ByteBuffer bufferRead = ByteBuffer.allocate(size);
- if (!file.isOpen())
- {
- file.open();
- }
- int bytesRead = 0;
- file.position(start);
+ int bytesRead = context.write(bufferRead);
- bytesRead = file.read(bufferRead);
-
bufferRead.flip();
if (bytesRead > 0)
@@ -200,6 +191,11 @@
}
}
+ public LargeMessageEncodingContext createNewContext()
+ {
+ return new DecodingContext();
+ }
+
private void checkDelete() throws Exception
{
if (getRefCount() <= 0)
@@ -375,11 +371,33 @@
{
throw new RuntimeException("could not setup linked file", e);
}
- finally
- {
- }
}
// Inner classes -------------------------------------------------
+ class DecodingContext implements LargeMessageEncodingContext
+ {
+ private SequentialFile cFile;
+
+ public void open() throws Exception
+ {
+ cFile = file.copy();
+ cFile.open();
+ }
+
+ public void close() throws Exception
+ {
+ cFile.close();
+ }
+
+ public int write(ByteBuffer bufferRead) throws Exception
+ {
+ return cFile.read(bufferRead);
+ }
+
+ public int write(HornetQBuffer bufferOut, int size)
+ {
+ return -1;
+ }
+ }
}
Modified:
trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
===================================================================
---
trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java 2009-11-03
14:32:35 UTC (rev 8192)
+++
trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java 2009-11-03
15:00:57 UTC (rev 8193)
@@ -16,8 +16,11 @@
import org.hornetq.core.buffers.ChannelBuffers;
import org.hornetq.core.remoting.spi.HornetQBuffer;
import org.hornetq.core.server.LargeServerMessage;
+import org.hornetq.core.message.LargeMessageEncodingContext;
import org.hornetq.core.server.impl.ServerMessageImpl;
+import java.nio.ByteBuffer;
+
/**
* A NullStorageLargeServerMessage
*
@@ -151,6 +154,49 @@
}
+ public LargeMessageEncodingContext createNewContext()
+ {
+ return new DecodingContext();
+ }
+
+ @Override
+ public void encodeBody(HornetQBuffer bufferOut, LargeMessageEncodingContext context,
int size)
+ {
+ DecodingContext decodingContext = (DecodingContext) context;
+ try
+ {
+ decodingContext.write(bufferOut, size);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ class DecodingContext implements LargeMessageEncodingContext
+ {
+ private int lastPos = 0;
+
+ public void open() throws Exception
+ {
+ }
+
+ public void close() throws Exception
+ {
+ }
+
+ public int write(final ByteBuffer bufferRead) throws Exception
+ {
+ return -1;
+ }
+
+ public int write(final HornetQBuffer bufferRead, final int size)
+ {
+ bufferRead.writeBytes(getBody(), lastPos, size);
+ lastPos += size;
+ return size;
+ }
+ }
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: trunk/src/main/org/hornetq/core/server/LargeServerMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/LargeServerMessage.java 2009-11-03 14:32:35 UTC
(rev 8192)
+++ trunk/src/main/org/hornetq/core/server/LargeServerMessage.java 2009-11-03 15:00:57 UTC
(rev 8193)
@@ -13,6 +13,8 @@
package org.hornetq.core.server;
+import org.hornetq.core.message.LargeMessageEncodingContext;
+
/**
* A LargeMessage
*
@@ -41,4 +43,6 @@
void incrementDelayDeletionCount();
void decrementDelayDeletionCount() throws Exception;
+
+ LargeMessageEncodingContext createNewContext();
}
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2009-11-03
14:32:35 UTC (rev 8192)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2009-11-03
15:00:57 UTC (rev 8193)
@@ -36,15 +36,10 @@
import org.hornetq.core.remoting.impl.wireformat.SessionReceiveContinuationMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionReceiveMessage;
import org.hornetq.core.remoting.spi.HornetQBuffer;
-import org.hornetq.core.server.HandleStatus;
-import org.hornetq.core.server.LargeServerMessage;
-import org.hornetq.core.server.MessageReference;
-import org.hornetq.core.server.Queue;
-import org.hornetq.core.server.ServerConsumer;
-import org.hornetq.core.server.ServerMessage;
-import org.hornetq.core.server.ServerSession;
+import org.hornetq.core.server.*;
import org.hornetq.core.transaction.Transaction;
import org.hornetq.core.transaction.impl.TransactionImpl;
+import org.hornetq.core.message.LargeMessageEncodingContext;
import org.hornetq.utils.TypedProperties;
/**
@@ -643,6 +638,8 @@
/** The current position on the message being processed */
private volatile long positionPendingLargeMessage;
+ private LargeMessageEncodingContext context;
+
public LargeMessageDeliverer(final LargeServerMessage message, final
MessageReference ref)
throws Exception
{
@@ -689,6 +686,8 @@
headerBuffer.array(),
pendingLargeMessage.getLargeBodySize(),
ref.getDeliveryCount());
+ context = pendingLargeMessage.createNewContext();
+ context.open();
}
int precalculateAvailableCredits;
@@ -726,7 +725,7 @@
return false;
}
- SessionReceiveContinuationMessage chunk = createChunkSend();
+ SessionReceiveContinuationMessage chunk = createChunkSend(context);
int chunkLen = chunk.getBody().length;
@@ -759,7 +758,7 @@
{
trace("Finished deliverLargeMessage");
}
-
+ context.close();
finish();
return true;
@@ -826,7 +825,7 @@
}
}
- private SessionReceiveContinuationMessage createChunkSend()
+ private SessionReceiveContinuationMessage
createChunkSend(LargeMessageEncodingContext context)
{
SessionReceiveContinuationMessage chunk;
@@ -836,7 +835,8 @@
HornetQBuffer bodyBuffer = ChannelBuffers.buffer(localChunkLen);
- pendingLargeMessage.encodeBody(bodyBuffer, positionPendingLargeMessage,
localChunkLen);
+ //pendingLargeMessage.encodeBody(bodyBuffer, positionPendingLargeMessage,
localChunkLen);
+ pendingLargeMessage.encodeBody(bodyBuffer, context, localChunkLen);
chunk = new SessionReceiveContinuationMessage(id,
bodyBuffer.array(),
Modified:
trunk/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java 2009-11-03
14:32:35 UTC (rev 8192)
+++
trunk/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java 2009-11-03
15:00:57 UTC (rev 8193)
@@ -365,7 +365,7 @@
log.debug("Read " + b + " bytes");
}
- assertEquals(getSamplebyte(b), buffer.readByte());
+ assertEquals("byte pos" + b + " is
incorrect", getSamplebyte(b), buffer.readByte());
}
}
}
Modified:
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2009-11-03
14:32:35 UTC (rev 8192)
+++
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2009-11-03
15:00:57 UTC (rev 8193)
@@ -596,6 +596,11 @@
{
}
+ public SequentialFile copy()
+ {
+ return null; //To change body of implemented methods use File | Settings | File
Templates.
+ }
+
/* (non-Javadoc)
* @see
org.hornetq.core.journal.SequentialFile#write(org.hornetq.core.remoting.spi.HornetQBuffer,
boolean, org.hornetq.core.journal.IOCallback)
*/
@@ -697,4 +702,5 @@
{
}
+
}
Modified:
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java 2009-11-03
14:32:35 UTC (rev 8192)
+++
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java 2009-11-03
15:00:57 UTC (rev 8193)
@@ -27,14 +27,11 @@
import org.hornetq.core.postoffice.BindingType;
import org.hornetq.core.postoffice.impl.BindingsImpl;
import org.hornetq.core.remoting.spi.HornetQBuffer;
-import org.hornetq.core.server.Bindable;
-import org.hornetq.core.server.MessageReference;
-import org.hornetq.core.server.Queue;
-import org.hornetq.core.server.RoutingContext;
-import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.*;
import org.hornetq.core.server.impl.RoutingContextImpl;
import org.hornetq.core.transaction.Transaction;
import org.hornetq.core.transaction.TransactionOperation;
+import org.hornetq.core.message.LargeMessageEncodingContext;
import org.hornetq.tests.util.UnitTestCase;
import org.hornetq.utils.SimpleString;
import org.hornetq.utils.TypedProperties;
@@ -456,12 +453,9 @@
}
- /* (non-Javadoc)
- * @see
org.hornetq.core.message.Message#encodeBody(org.hornetq.core.remoting.spi.HornetQBuffer,
long, int)
- */
- public void encodeBody(final HornetQBuffer buffer, final long start, final int
size)
+ public void encodeBody(HornetQBuffer bufferOut, LargeMessageEncodingContext
context, int size)
{
-
+ //To change body of implemented methods use File | Settings | File Templates.
}
/* (non-Javadoc)