[hornetq-commits] JBoss hornetq SVN: r8193 - in trunk: src/main/org/hornetq/core/journal and 10 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Nov 3 10:00:58 EST 2009


Author: ataylor
Date: 2009-11-03 10:00:57 -0500 (Tue, 03 Nov 2009)
New Revision: 8193

Added:
   trunk/src/main/org/hornetq/core/message/LargeMessageEncodingContext.java
Modified:
   trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
   trunk/src/main/org/hornetq/core/journal/SequentialFile.java
   trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java
   trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
   trunk/src/main/org/hornetq/core/message/Message.java
   trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java
   trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalLargeServerMessage.java
   trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
   trunk/src/main/org/hornetq/core/server/LargeServerMessage.java
   trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
   trunk/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java
   trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
   trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-190 - each largemessagedeliverer now uses its own file handle

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java	2009-11-03 14:32:35 UTC (rev 8192)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java	2009-11-03 15:00:57 UTC (rev 8193)
@@ -17,11 +17,13 @@
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.nio.ByteBuffer;
 
 import org.hornetq.core.buffers.ChannelBuffers;
 import org.hornetq.core.exception.HornetQException;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.message.Message;
+import org.hornetq.core.message.LargeMessageEncodingContext;
 import org.hornetq.core.message.impl.MessageImpl;
 import org.hornetq.core.remoting.Channel;
 import org.hornetq.core.remoting.impl.wireformat.SessionSendContinuationMessage;
@@ -368,6 +370,8 @@
       {
          final long bodySize = msg.getLargeBodySize();
 
+         LargeMessageEncodingContext context = new DecodingContext(msg);
+
          for (int pos = 0; pos < bodySize;)
          {
             final boolean lastChunk;
@@ -376,7 +380,7 @@
 
             final HornetQBuffer bodyBuffer = ChannelBuffers.buffer(chunkLength);
 
-            msg.encodeBody(bodyBuffer, pos, chunkLength);
+            msg.encodeBody(bodyBuffer, context, chunkLength);
 
             pos += chunkLength;
 
@@ -408,5 +412,34 @@
    }
 
    // Inner Classes --------------------------------------------------------------------------------
+   class DecodingContext implements LargeMessageEncodingContext
+   {
+      private final Message message;
+      private int lastPos = 0;
 
+      public DecodingContext(Message message)
+      {
+         this.message = message;
+      }
+
+      public void open() throws Exception
+      {
+      }
+
+      public void close() throws Exception
+      {
+      }
+
+      public int write(ByteBuffer bufferRead) throws Exception
+      {
+         return -1;
+      }
+
+      public int write(HornetQBuffer bufferOut, int size)
+      {
+         bufferOut.writeBytes(message.getBody(), lastPos, size);
+         lastPos += size;
+         return size;
+      }
+   }
 }

Modified: trunk/src/main/org/hornetq/core/journal/SequentialFile.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/SequentialFile.java	2009-11-03 14:32:35 UTC (rev 8192)
+++ trunk/src/main/org/hornetq/core/journal/SequentialFile.java	2009-11-03 15:00:57 UTC (rev 8193)
@@ -85,4 +85,5 @@
 
    void enableAutoFlush();
 
+   SequentialFile copy();
 }

Modified: trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java	2009-11-03 14:32:35 UTC (rev 8192)
+++ trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java	2009-11-03 15:00:57 UTC (rev 8193)
@@ -29,6 +29,7 @@
 import org.hornetq.core.asyncio.impl.TimedBufferObserver;
 import org.hornetq.core.journal.IOCallback;
 import org.hornetq.core.journal.SequentialFileFactory;
+import org.hornetq.core.journal.SequentialFile;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.remoting.spi.HornetQBuffer;
 
@@ -126,6 +127,11 @@
       timedBuffer.enableAutoFlush();
    }
 
+   public SequentialFile copy()
+   {
+      return new AIOSequentialFile(factory, -1, -1, getFile().getParent(), getFileName(), maxIO, bufferCallback, executor, pollerExecutor);
+   }
+
    public synchronized void close() throws Exception
    {
       if (!opened)

Modified: trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java	2009-11-03 14:32:35 UTC (rev 8192)
+++ trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java	2009-11-03 15:00:57 UTC (rev 8193)
@@ -20,6 +20,7 @@
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.hornetq.core.journal.IOCallback;
+import org.hornetq.core.journal.SequentialFile;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.remoting.spi.HornetQBuffer;
 
@@ -48,6 +49,11 @@
       super(directory, new File(directory + "/" + fileName));
    }
 
+   public NIOSequentialFile(File file)
+   {
+      super(file.getParent(), new File(file.getPath()));
+   }
+
    public int getAlignment()
    {
       return 1;
@@ -265,4 +271,8 @@
    {
    }
 
+   public SequentialFile copy()
+   {
+      return new NIOSequentialFile(getFile());
+   }
 }

Added: trunk/src/main/org/hornetq/core/message/LargeMessageEncodingContext.java
===================================================================
--- trunk/src/main/org/hornetq/core/message/LargeMessageEncodingContext.java	                        (rev 0)
+++ trunk/src/main/org/hornetq/core/message/LargeMessageEncodingContext.java	2009-11-03 15:00:57 UTC (rev 8193)
@@ -0,0 +1,20 @@
+package org.hornetq.core.message;
+
+import org.hornetq.core.remoting.spi.HornetQBuffer;
+
+import java.nio.ByteBuffer;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ *         Created Nov 2, 2009
+ */
+public interface LargeMessageEncodingContext
+{
+   void open() throws Exception;
+
+   void close() throws Exception;
+
+   int write(ByteBuffer bufferRead) throws Exception;
+
+   int write(HornetQBuffer bufferOut, int size);
+}

Modified: trunk/src/main/org/hornetq/core/message/Message.java
===================================================================
--- trunk/src/main/org/hornetq/core/message/Message.java	2009-11-03 14:32:35 UTC (rev 8192)
+++ trunk/src/main/org/hornetq/core/message/Message.java	2009-11-03 15:00:57 UTC (rev 8193)
@@ -18,6 +18,7 @@
 import java.util.Set;
 
 import org.hornetq.core.remoting.spi.HornetQBuffer;
+import org.hornetq.core.message.LargeMessageEncodingContext;
 import org.hornetq.utils.SimpleString;
 import org.hornetq.utils.TypedProperties;
 
@@ -81,7 +82,7 @@
    long getLargeBodySize();
          
    // Used on Message chunk
-   void encodeBody(HornetQBuffer buffer, long start, int size);
+   void encodeBody(final HornetQBuffer bufferOut, LargeMessageEncodingContext context, int size);
    
    /** Set the InputStream used on a message that will be sent over a producer */
    void setBodyInputStream(InputStream stream);

Modified: trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java	2009-11-03 14:32:35 UTC (rev 8192)
+++ trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java	2009-11-03 15:00:57 UTC (rev 8193)
@@ -27,6 +27,7 @@
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.message.Message;
 import org.hornetq.core.remoting.spi.HornetQBuffer;
+import org.hornetq.core.message.LargeMessageEncodingContext;
 import org.hornetq.utils.SimpleString;
 import org.hornetq.utils.TypedProperties;
 
@@ -213,10 +214,10 @@
       buffer.writeBytes(localBody.array(), 0, localBody.writerIndex());
    }
 
-   // Used on Message chunk
-   public void encodeBody(HornetQBuffer buffer, long start, int size)
+   // Used on Message chunk side
+   public void encodeBody(final HornetQBuffer bufferOut, LargeMessageEncodingContext context, int size)
    {
-      buffer.writeBytes(body, (int)start, size);
+      context.write(bufferOut, size);
    }
 
    public void decode(final HornetQBuffer buffer)

Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalLargeServerMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalLargeServerMessage.java	2009-11-03 14:32:35 UTC (rev 8192)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalLargeServerMessage.java	2009-11-03 15:00:57 UTC (rev 8193)
@@ -25,6 +25,7 @@
 import org.hornetq.core.server.LargeServerMessage;
 import org.hornetq.core.server.MessageReference;
 import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.message.LargeMessageEncodingContext;
 import org.hornetq.core.server.impl.ServerMessageImpl;
 
 /**
@@ -105,25 +106,15 @@
       bodySize += bytes.length;
    }
 
-   @Override
-   public synchronized void encodeBody(final HornetQBuffer bufferOut, final long start, final int size)
+   public void encodeBody(final HornetQBuffer bufferOut, LargeMessageEncodingContext context, int size)
    {
       try
       {
-         validateFile();
-
          // This could maybe be optimized (maybe reading directly into bufferOut)
          ByteBuffer bufferRead = ByteBuffer.allocate(size);
-         if (!file.isOpen())
-         {
-            file.open();
-         }
 
-         int bytesRead = 0;
-         file.position(start);
+         int bytesRead = context.write(bufferRead);
 
-         bytesRead = file.read(bufferRead);
-
          bufferRead.flip();
 
          if (bytesRead > 0)
@@ -200,6 +191,11 @@
       }
    }
 
+   public LargeMessageEncodingContext createNewContext()
+   {
+      return new DecodingContext();
+   }
+
    private void checkDelete() throws Exception
    {      
       if (getRefCount() <= 0)
@@ -375,11 +371,33 @@
       {
          throw new RuntimeException("could not setup linked file", e);
       }
-      finally
-      {
-      }
    }
 
    // Inner classes -------------------------------------------------
 
+   class DecodingContext implements LargeMessageEncodingContext
+   {
+      private SequentialFile cFile;
+      
+      public void open() throws Exception
+      {
+         cFile = file.copy();
+         cFile.open();
+      }
+
+      public void close() throws Exception
+      {
+         cFile.close();
+      }
+
+      public int write(ByteBuffer bufferRead) throws Exception
+      {
+         return cFile.read(bufferRead);
+      }
+
+      public int write(HornetQBuffer bufferOut, int size)
+      {
+         return -1;
+      }
+   }
 }

Modified: trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java	2009-11-03 14:32:35 UTC (rev 8192)
+++ trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java	2009-11-03 15:00:57 UTC (rev 8193)
@@ -16,8 +16,11 @@
 import org.hornetq.core.buffers.ChannelBuffers;
 import org.hornetq.core.remoting.spi.HornetQBuffer;
 import org.hornetq.core.server.LargeServerMessage;
+import org.hornetq.core.message.LargeMessageEncodingContext;
 import org.hornetq.core.server.impl.ServerMessageImpl;
 
+import java.nio.ByteBuffer;
+
 /**
  * A NullStorageLargeServerMessage
  *
@@ -151,6 +154,49 @@
 
    }
 
+   public LargeMessageEncodingContext createNewContext()
+   {
+      return new DecodingContext();
+   }
+
+   @Override
+   public void encodeBody(HornetQBuffer bufferOut, LargeMessageEncodingContext context, int size)
+   {
+      DecodingContext decodingContext = (DecodingContext) context;
+      try
+      {
+         decodingContext.write(bufferOut, size);
+      }
+      catch (Exception e)
+      {
+         throw new RuntimeException(e);
+      }
+   }
+
+   class DecodingContext implements LargeMessageEncodingContext
+   {
+      private int lastPos = 0;
+
+      public void open() throws Exception
+      {
+      }
+
+      public void close() throws Exception
+      {
+      }
+
+      public int write(final ByteBuffer bufferRead) throws Exception
+      {
+         return -1;
+      }
+
+      public int write(final HornetQBuffer bufferRead, final int size)
+      {
+         bufferRead.writeBytes(getBody(), lastPos, size);
+         lastPos += size;
+         return size;
+      }
+   }
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: trunk/src/main/org/hornetq/core/server/LargeServerMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/LargeServerMessage.java	2009-11-03 14:32:35 UTC (rev 8192)
+++ trunk/src/main/org/hornetq/core/server/LargeServerMessage.java	2009-11-03 15:00:57 UTC (rev 8193)
@@ -13,6 +13,8 @@
 
 package org.hornetq.core.server;
 
+import org.hornetq.core.message.LargeMessageEncodingContext;
+
 /**
  * A LargeMessage
  *
@@ -41,4 +43,6 @@
    void incrementDelayDeletionCount();
    
    void decrementDelayDeletionCount() throws Exception;
+
+   LargeMessageEncodingContext createNewContext();
 }

Modified: trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2009-11-03 14:32:35 UTC (rev 8192)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2009-11-03 15:00:57 UTC (rev 8193)
@@ -36,15 +36,10 @@
 import org.hornetq.core.remoting.impl.wireformat.SessionReceiveContinuationMessage;
 import org.hornetq.core.remoting.impl.wireformat.SessionReceiveMessage;
 import org.hornetq.core.remoting.spi.HornetQBuffer;
-import org.hornetq.core.server.HandleStatus;
-import org.hornetq.core.server.LargeServerMessage;
-import org.hornetq.core.server.MessageReference;
-import org.hornetq.core.server.Queue;
-import org.hornetq.core.server.ServerConsumer;
-import org.hornetq.core.server.ServerMessage;
-import org.hornetq.core.server.ServerSession;
+import org.hornetq.core.server.*;
 import org.hornetq.core.transaction.Transaction;
 import org.hornetq.core.transaction.impl.TransactionImpl;
+import org.hornetq.core.message.LargeMessageEncodingContext;
 import org.hornetq.utils.TypedProperties;
 
 /**
@@ -643,6 +638,8 @@
       /** The current position on the message being processed */
       private volatile long positionPendingLargeMessage;
 
+      private LargeMessageEncodingContext context;
+
       public LargeMessageDeliverer(final LargeServerMessage message, final MessageReference ref)
          throws Exception
       {
@@ -689,6 +686,8 @@
                                                           headerBuffer.array(),
                                                           pendingLargeMessage.getLargeBodySize(),
                                                           ref.getDeliveryCount());
+               context = pendingLargeMessage.createNewContext();
+               context.open();
             }
 
             int precalculateAvailableCredits;
@@ -726,7 +725,7 @@
                   return false;
                }
 
-               SessionReceiveContinuationMessage chunk = createChunkSend();
+               SessionReceiveContinuationMessage chunk = createChunkSend(context);
 
                int chunkLen = chunk.getBody().length;
 
@@ -759,7 +758,7 @@
             {
                trace("Finished deliverLargeMessage");
             }
-
+            context.close();
             finish();
 
             return true;
@@ -826,7 +825,7 @@
          }
       }
 
-      private SessionReceiveContinuationMessage createChunkSend()
+      private SessionReceiveContinuationMessage createChunkSend(LargeMessageEncodingContext context)
       {
          SessionReceiveContinuationMessage chunk;
 
@@ -836,7 +835,8 @@
 
          HornetQBuffer bodyBuffer = ChannelBuffers.buffer(localChunkLen);
 
-         pendingLargeMessage.encodeBody(bodyBuffer, positionPendingLargeMessage, localChunkLen);
+         //pendingLargeMessage.encodeBody(bodyBuffer, positionPendingLargeMessage, localChunkLen);
+         pendingLargeMessage.encodeBody(bodyBuffer, context, localChunkLen);
 
          chunk = new SessionReceiveContinuationMessage(id,
                                                        bodyBuffer.array(),

Modified: trunk/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java	2009-11-03 14:32:35 UTC (rev 8192)
+++ trunk/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java	2009-11-03 15:00:57 UTC (rev 8193)
@@ -365,7 +365,7 @@
                                  log.debug("Read " + b + " bytes");
                               }
 
-                              assertEquals(getSamplebyte(b), buffer.readByte());
+                              assertEquals("byte pos" + b + " is incorrect", getSamplebyte(b), buffer.readByte());
                            }
                         }
                      }

Modified: trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java	2009-11-03 14:32:35 UTC (rev 8192)
+++ trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java	2009-11-03 15:00:57 UTC (rev 8193)
@@ -596,6 +596,11 @@
       {
       }
 
+      public SequentialFile copy()
+      {
+         return null;  //To change body of implemented methods use File | Settings | File Templates.
+      }
+
       /* (non-Javadoc)
        * @see org.hornetq.core.journal.SequentialFile#write(org.hornetq.core.remoting.spi.HornetQBuffer, boolean, org.hornetq.core.journal.IOCallback)
        */
@@ -697,4 +702,5 @@
    {
    }
 
+
 }

Modified: trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java	2009-11-03 14:32:35 UTC (rev 8192)
+++ trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java	2009-11-03 15:00:57 UTC (rev 8193)
@@ -27,14 +27,11 @@
 import org.hornetq.core.postoffice.BindingType;
 import org.hornetq.core.postoffice.impl.BindingsImpl;
 import org.hornetq.core.remoting.spi.HornetQBuffer;
-import org.hornetq.core.server.Bindable;
-import org.hornetq.core.server.MessageReference;
-import org.hornetq.core.server.Queue;
-import org.hornetq.core.server.RoutingContext;
-import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.*;
 import org.hornetq.core.server.impl.RoutingContextImpl;
 import org.hornetq.core.transaction.Transaction;
 import org.hornetq.core.transaction.TransactionOperation;
+import org.hornetq.core.message.LargeMessageEncodingContext;
 import org.hornetq.tests.util.UnitTestCase;
 import org.hornetq.utils.SimpleString;
 import org.hornetq.utils.TypedProperties;
@@ -456,12 +453,9 @@
 
       }
 
-      /* (non-Javadoc)
-       * @see org.hornetq.core.message.Message#encodeBody(org.hornetq.core.remoting.spi.HornetQBuffer, long, int)
-       */
-      public void encodeBody(final HornetQBuffer buffer, final long start, final int size)
+      public void encodeBody(HornetQBuffer bufferOut, LargeMessageEncodingContext context, int size)
       {
-
+         //To change body of implemented methods use File | Settings | File Templates.
       }
 
       /* (non-Javadoc)



More information about the hornetq-commits mailing list