[hornetq-commits] JBoss hornetq SVN: r8248 - in trunk: src/main/org/hornetq/core/message and 10 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Nov 6 19:33:25 EST 2009


Author: clebert.suconic at jboss.com
Date: 2009-11-06 19:33:24 -0500 (Fri, 06 Nov 2009)
New Revision: 8248

Added:
   trunk/src/main/org/hornetq/core/message/BodyEncoder.java
   trunk/tests/src/org/hornetq/tests/integration/largemessage/ServerLargeMessageTest.java
Removed:
   trunk/src/main/org/hornetq/core/message/LargeMessageEncodingContext.java
Modified:
   trunk/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java
   trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
   trunk/src/main/org/hornetq/core/client/impl/LargeMessageBufferImpl.java
   trunk/src/main/org/hornetq/core/message/Message.java
   trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java
   trunk/src/main/org/hornetq/core/persistence/impl/journal/FileLargeServerMessage.java
   trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
   trunk/src/main/org/hornetq/core/server/LargeServerMessage.java
   trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
   trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
   trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
   trunk/tests/src/org/hornetq/tests/timing/core/journal/impl/AIOJournalImplTest.java
   trunk/tests/src/org/hornetq/tests/timing/core/journal/impl/FakeJournalImplTest.java
   trunk/tests/src/org/hornetq/tests/timing/core/journal/impl/JournalImplTestUnit.java
   trunk/tests/src/org/hornetq/tests/timing/core/journal/impl/NIOJournalImplTest.java
   trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-211 - ClientProducer to send LargeMessages and few other tweaks

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java	2009-11-06 20:38:14 UTC (rev 8247)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java	2009-11-07 00:33:24 UTC (rev 8248)
@@ -15,10 +15,12 @@
 
 import java.io.IOException;
 import java.io.OutputStream;
+import java.nio.ByteBuffer;
 
 import org.hornetq.core.buffers.ChannelBuffers;
 import org.hornetq.core.client.LargeMessageBuffer;
 import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.message.BodyEncoder;
 import org.hornetq.core.message.impl.MessageImpl;
 import org.hornetq.core.remoting.spi.HornetQBuffer;
 import org.hornetq.utils.SimpleString;
@@ -228,5 +230,8 @@
              "]";
    }
 
-
+   /* (non-Javadoc)
+    * @see org.hornetq.core.message.Message#getBodyEncoder()
+    */
+   
 }

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java	2009-11-06 20:38:14 UTC (rev 8247)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java	2009-11-07 00:33:24 UTC (rev 8248)
@@ -17,12 +17,11 @@
 
 import java.io.IOException;
 import java.io.InputStream;
-import java.nio.ByteBuffer;
 
 import org.hornetq.core.buffers.ChannelBuffers;
 import org.hornetq.core.exception.HornetQException;
 import org.hornetq.core.logging.Logger;
-import org.hornetq.core.message.LargeMessageEncodingContext;
+import org.hornetq.core.message.BodyEncoder;
 import org.hornetq.core.message.Message;
 import org.hornetq.core.message.impl.MessageImpl;
 import org.hornetq.core.remoting.Channel;
@@ -347,47 +346,59 @@
    {
       final long bodySize = msg.getLargeBodySize();
 
-      LargeMessageEncodingContext context = new DecodingContext(msg);
+      BodyEncoder context = msg.getBodyEncoder();
 
-      for (int pos = 0; pos < bodySize;)
+      context.open();
+      try
       {
-         final boolean lastChunk;
 
-         final int chunkLength = Math.min((int)(bodySize - pos), minLargeMessageSize);
+         for (int pos = 0; pos < bodySize;)
+         {
+            final boolean lastChunk;
 
-         final HornetQBuffer bodyBuffer = ChannelBuffers.buffer(chunkLength);
+            final int chunkLength = Math.min((int)(bodySize - pos), minLargeMessageSize);
 
-         msg.encodeBody(bodyBuffer, context, chunkLength);
+            final HornetQBuffer bodyBuffer = ChannelBuffers.buffer(chunkLength);
 
-         pos += chunkLength;
+            context.encode(bodyBuffer, chunkLength);
 
-         lastChunk = pos >= bodySize;
+            pos += chunkLength;
 
-         final SessionSendContinuationMessage chunk = new SessionSendContinuationMessage(bodyBuffer.array(),
-                                                                                         !lastChunk,
-                                                                                         lastChunk && sendBlocking);
+            lastChunk = pos >= bodySize;
 
-         if (sendBlocking && lastChunk)
-         {
-            // When sending it blocking, only the last chunk will be blocking.
-            channel.sendBlocking(chunk);
-         }
-         else
-         {
-            channel.send(chunk);
-         }
+            final SessionSendContinuationMessage chunk = new SessionSendContinuationMessage(bodyBuffer.array(),
+                                                                                            !lastChunk,
+                                                                                            lastChunk && sendBlocking);
 
-         try
-         {
-            credits.acquireCredits(chunk.getRequiredBufferSize());
+            if (sendBlocking && lastChunk)
+            {
+               // When sending it blocking, only the last chunk will be blocking.
+               channel.sendBlocking(chunk);
+            }
+            else
+            {
+               channel.send(chunk);
+            }
+
+            try
+            {
+               credits.acquireCredits(chunk.getRequiredBufferSize());
+            }
+            catch (InterruptedException e)
+            {
+            }
          }
-         catch (InterruptedException e)
-         {
-         }
       }
+      finally
+      {
+         context.close();
+      }
    }
 
    /**
+    * TODO: This method could be eliminated and 
+    *       combined with {@link ClientProducerImpl#largeMessageSendBuffered(boolean, Message, ClientProducerCredits)}. 
+    *       All that's needed for this is ClientMessage returning the proper BodyEncoder for streamed
     * @param sendBlocking
     * @param input
     * @throws HornetQException
@@ -477,35 +488,4 @@
    }
 
    // Inner Classes --------------------------------------------------------------------------------
-   class DecodingContext implements LargeMessageEncodingContext
-   {
-      private final Message message;
-
-      private int lastPos = 0;
-
-      public DecodingContext(Message message)
-      {
-         this.message = message;
-      }
-
-      public void open() throws Exception
-      {
-      }
-
-      public void close() throws Exception
-      {
-      }
-
-      public int write(ByteBuffer bufferRead) throws Exception
-      {
-         return -1;
-      }
-
-      public int write(HornetQBuffer bufferOut, int size)
-      {
-         bufferOut.writeBytes(message.getBody(), lastPos, size);
-         lastPos += size;
-         return size;
-      }
-   }
 }

Modified: trunk/src/main/org/hornetq/core/client/impl/LargeMessageBufferImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/LargeMessageBufferImpl.java	2009-11-06 20:38:14 UTC (rev 8247)
+++ trunk/src/main/org/hornetq/core/client/impl/LargeMessageBufferImpl.java	2009-11-07 00:33:24 UTC (rev 8248)
@@ -58,7 +58,7 @@
 
    private final LinkedBlockingQueue<SessionReceiveContinuationMessage> packets = new LinkedBlockingQueue<SessionReceiveContinuationMessage>();
 
-   private SessionReceiveContinuationMessage currentPacket = null;
+   private volatile SessionReceiveContinuationMessage currentPacket = null;
 
    private final long totalSize;
 

Copied: trunk/src/main/org/hornetq/core/message/BodyEncoder.java (from rev 8244, trunk/src/main/org/hornetq/core/message/LargeMessageEncodingContext.java)
===================================================================
--- trunk/src/main/org/hornetq/core/message/BodyEncoder.java	                        (rev 0)
+++ trunk/src/main/org/hornetq/core/message/BodyEncoder.java	2009-11-07 00:33:24 UTC (rev 8248)
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+
+package org.hornetq.core.message;
+
+import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.remoting.spi.HornetQBuffer;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Class used to encode message body into buffers.
+ * Used to send large streams over the wire
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *         Created Nov 2, 2009
+ */
+public interface BodyEncoder
+{
+   void open() throws HornetQException;
+
+   void close() throws HornetQException;
+
+   int encode(ByteBuffer bufferRead) throws HornetQException;
+
+   int encode(HornetQBuffer bufferOut, int size) throws HornetQException;
+}

Deleted: trunk/src/main/org/hornetq/core/message/LargeMessageEncodingContext.java
===================================================================
--- trunk/src/main/org/hornetq/core/message/LargeMessageEncodingContext.java	2009-11-06 20:38:14 UTC (rev 8247)
+++ trunk/src/main/org/hornetq/core/message/LargeMessageEncodingContext.java	2009-11-07 00:33:24 UTC (rev 8248)
@@ -1,20 +0,0 @@
-package org.hornetq.core.message;
-
-import org.hornetq.core.remoting.spi.HornetQBuffer;
-
-import java.nio.ByteBuffer;
-
-/**
- * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
- *         Created Nov 2, 2009
- */
-public interface LargeMessageEncodingContext
-{
-   void open() throws Exception;
-
-   void close() throws Exception;
-
-   int write(ByteBuffer bufferRead) throws Exception;
-
-   int write(HornetQBuffer bufferOut, int size);
-}

Modified: trunk/src/main/org/hornetq/core/message/Message.java
===================================================================
--- trunk/src/main/org/hornetq/core/message/Message.java	2009-11-06 20:38:14 UTC (rev 8247)
+++ trunk/src/main/org/hornetq/core/message/Message.java	2009-11-07 00:33:24 UTC (rev 8248)
@@ -78,10 +78,10 @@
    boolean isLargeMessage();
 
    long getLargeBodySize();
+   
+   /** Used to encode Body over the wire when using large messages */
+   BodyEncoder getBodyEncoder();
 
-   // Used on Message chunk
-   void encodeBody(final HornetQBuffer bufferOut, LargeMessageEncodingContext context, int size);
-
    /** Set the InputStream used on a message that will be sent over a producer */
    void setBodyInputStream(InputStream stream);
 

Modified: trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java	2009-11-06 20:38:14 UTC (rev 8247)
+++ trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java	2009-11-07 00:33:24 UTC (rev 8248)
@@ -19,13 +19,15 @@
 import static org.hornetq.utils.DataConstants.SIZE_LONG;
 
 import java.io.InputStream;
+import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 
 import org.hornetq.core.buffers.ChannelBuffers;
+import org.hornetq.core.exception.HornetQException;
 import org.hornetq.core.logging.Logger;
-import org.hornetq.core.message.LargeMessageEncodingContext;
+import org.hornetq.core.message.BodyEncoder;
 import org.hornetq.core.message.Message;
 import org.hornetq.core.message.PropertyConversionException;
 import org.hornetq.core.remoting.spi.HornetQBuffer;
@@ -214,12 +216,6 @@
       buffer.writeBytes(localBody.array(), 0, localBody.writerIndex());
    }
 
-   // Used on Message chunk side
-   public void encodeBody(final HornetQBuffer bufferOut, final LargeMessageEncodingContext context, final int size)
-   {
-      context.write(bufferOut, size);
-   }
-
    public void decode(final HornetQBuffer buffer)
    {
       decodeHeadersAndProperties(buffer);
@@ -663,6 +659,11 @@
    {
       this.body = body;
    }
+   
+   public BodyEncoder getBodyEncoder()
+   {
+      return new DecodingContext();
+   }
 
    // Public --------------------------------------------------------
 
@@ -673,4 +674,36 @@
    // Private -------------------------------------------------------
 
    // Inner classes -------------------------------------------------
+
+   
+   class DecodingContext implements BodyEncoder
+   {
+      private int lastPos = 0;
+
+      public DecodingContext()
+      {
+      }
+
+      public void open()
+      {
+      }
+
+      public void close()
+      {
+      }
+
+      public int encode(ByteBuffer bufferRead) throws HornetQException
+      {
+         HornetQBuffer buffer = ChannelBuffers.wrappedBuffer(bufferRead);
+         return encode(buffer, bufferRead.capacity());
+      }
+
+      public int encode(HornetQBuffer bufferOut, int size)
+      {
+         bufferOut.writeBytes(getBody(), lastPos, size);
+         lastPos += size;
+         return size;
+      }
+   }
+
 }

Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/FileLargeServerMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/FileLargeServerMessage.java	2009-11-06 20:38:14 UTC (rev 8247)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/FileLargeServerMessage.java	2009-11-07 00:33:24 UTC (rev 8248)
@@ -15,9 +15,11 @@
 
 import static org.hornetq.utils.DataConstants.SIZE_INT;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.hornetq.core.exception.HornetQException;
 import org.hornetq.core.journal.SequentialFile;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.paging.PagingStore;
@@ -25,7 +27,7 @@
 import org.hornetq.core.server.LargeServerMessage;
 import org.hornetq.core.server.MessageReference;
 import org.hornetq.core.server.ServerMessage;
-import org.hornetq.core.message.LargeMessageEncodingContext;
+import org.hornetq.core.message.BodyEncoder;
 import org.hornetq.core.server.impl.ServerMessageImpl;
 
 /**
@@ -55,7 +57,7 @@
    private SequentialFile file;
 
    private long bodySize = -1;
-   
+
    private final AtomicInteger delayDeletionCount = new AtomicInteger(0);
 
    // Static --------------------------------------------------------
@@ -64,7 +66,7 @@
 
    public FileLargeServerMessage(final JournalStorageManager storageManager)
    {
-      this.storageManager = storageManager;      
+      this.storageManager = storageManager;
    }
 
    /**
@@ -72,16 +74,14 @@
     * @param copy
     * @param fileCopy
     */
-   private FileLargeServerMessage(final FileLargeServerMessage copy,
-                                     final SequentialFile fileCopy,
-                                     final long newID)
+   private FileLargeServerMessage(final FileLargeServerMessage copy, final SequentialFile fileCopy, final long newID)
    {
       super(copy);
       this.linkMessage = copy;
       storageManager = copy.storageManager;
       file = fileCopy;
       bodySize = copy.bodySize;
-      setMessageID(newID);      
+      setMessageID(newID);
    }
 
    // Public --------------------------------------------------------
@@ -92,7 +92,7 @@
    public synchronized void addBytes(final byte[] bytes) throws Exception
    {
       validateFile();
-      
+
       if (!file.isOpen())
       {
          file.open();
@@ -103,14 +103,14 @@
       bodySize += bytes.length;
    }
 
-   public void encodeBody(final HornetQBuffer bufferOut, LargeMessageEncodingContext context, int size)
+   public void encodeBody(final HornetQBuffer bufferOut, BodyEncoder context, int size)
    {
       try
       {
          // This could maybe be optimized (maybe reading directly into bufferOut)
          ByteBuffer bufferRead = ByteBuffer.allocate(size);
 
-         int bytesRead = context.write(bufferRead);
+         int bytesRead = context.encode(bufferRead);
 
          bufferRead.flip();
 
@@ -188,13 +188,13 @@
       }
    }
 
-   public LargeMessageEncodingContext createNewContext()
+   public BodyEncoder getBodyEncoder()
    {
       return new DecodingContext();
    }
 
    private void checkDelete() throws Exception
-   {      
+   {
       if (getRefCount() <= 0)
       {
          if (linkMessage != null)
@@ -301,9 +301,9 @@
       SequentialFile newfile = storageManager.createFileForLargeMessage(idToUse, durable);
 
       ServerMessage newMessage = new FileLargeServerMessage(linkMessage == null ? this
-                                                                                  : (FileLargeServerMessage)linkMessage,
-                                                               newfile,
-                                                               newID);
+                                                                               : (FileLargeServerMessage)linkMessage,
+                                                            newfile,
+                                                            newID);
 
       return newMessage;
    }
@@ -334,7 +334,7 @@
          {
             throw new RuntimeException("MessageID not set on LargeMessage");
          }
-        
+
          file = storageManager.createFileForLargeMessage(getMessageID(), durable);
 
          file.open();
@@ -372,29 +372,62 @@
 
    // Inner classes -------------------------------------------------
 
-   class DecodingContext implements LargeMessageEncodingContext
+   class DecodingContext implements BodyEncoder
    {
       private SequentialFile cFile;
-      
-      public void open() throws Exception
+
+      public void open() throws HornetQException
       {
-         cFile = file.copy();
-         cFile.open();
+         try
+         {
+            cFile = file.copy();
+            cFile.open();
+         }
+         catch (Exception e)
+         {
+            throw new HornetQException(HornetQException.INTERNAL_ERROR, e.getMessage(), e);
+         }
       }
 
-      public void close() throws Exception
+      public void close() throws HornetQException
       {
-         cFile.close();
+         try
+         {
+            cFile.close();
+         }
+         catch (Exception e)
+         {
+            throw new HornetQException(HornetQException.INTERNAL_ERROR, e.getMessage(), e);
+         }
       }
 
-      public int write(ByteBuffer bufferRead) throws Exception
+      public int encode(ByteBuffer bufferRead) throws HornetQException
       {
-         return cFile.read(bufferRead);
+         try
+         {
+            return cFile.read(bufferRead);
+         }
+         catch (Exception e)
+         {
+            throw new HornetQException(HornetQException.INTERNAL_ERROR, e.getMessage(), e);
+         }
       }
 
-      public int write(HornetQBuffer bufferOut, int size)
+      public int encode(HornetQBuffer bufferOut, int size) throws HornetQException
       {
-         return -1;
+         // This could maybe be optimized (maybe reading directly into bufferOut)
+         ByteBuffer bufferRead = ByteBuffer.allocate(size);
+
+         int bytesRead = encode(bufferRead);
+
+         bufferRead.flip();
+
+         if (bytesRead > 0)
+         {
+            bufferOut.writeBytes(bufferRead.array(), 0, bytesRead);
+         }
+
+         return bytesRead;
       }
    }
 }

Modified: trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java	2009-11-06 20:38:14 UTC (rev 8247)
+++ trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java	2009-11-07 00:33:24 UTC (rev 8248)
@@ -16,11 +16,8 @@
 import org.hornetq.core.buffers.ChannelBuffers;
 import org.hornetq.core.remoting.spi.HornetQBuffer;
 import org.hornetq.core.server.LargeServerMessage;
-import org.hornetq.core.message.LargeMessageEncodingContext;
 import org.hornetq.core.server.impl.ServerMessageImpl;
 
-import java.nio.ByteBuffer;
-
 /**
  * A NullStorageLargeServerMessage
  *
@@ -160,49 +157,6 @@
       return getHeadersAndPropertiesEncodeSize();
    }
 
-   public LargeMessageEncodingContext createNewContext()
-   {
-      return new DecodingContext();
-   }
-
-   @Override
-   public void encodeBody(HornetQBuffer bufferOut, LargeMessageEncodingContext context, int size)
-   {
-      DecodingContext decodingContext = (DecodingContext) context;
-      try
-      {
-         decodingContext.write(bufferOut, size);
-      }
-      catch (Exception e)
-      {
-         throw new RuntimeException(e);
-      }
-   }
-
-   class DecodingContext implements LargeMessageEncodingContext
-   {
-      private int lastPos = 0;
-
-      public void open() throws Exception
-      {
-      }
-
-      public void close() throws Exception
-      {
-      }
-
-      public int write(final ByteBuffer bufferRead) throws Exception
-      {
-         return -1;
-      }
-
-      public int write(final HornetQBuffer bufferRead, final int size)
-      {
-         bufferRead.writeBytes(getBody(), lastPos, size);
-         lastPos += size;
-         return size;
-      }
-   }
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: trunk/src/main/org/hornetq/core/server/LargeServerMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/LargeServerMessage.java	2009-11-06 20:38:14 UTC (rev 8247)
+++ trunk/src/main/org/hornetq/core/server/LargeServerMessage.java	2009-11-07 00:33:24 UTC (rev 8248)
@@ -13,7 +13,7 @@
 
 package org.hornetq.core.server;
 
-import org.hornetq.core.message.LargeMessageEncodingContext;
+import org.hornetq.core.message.BodyEncoder;
 
 /**
  * A LargeMessage
@@ -43,6 +43,4 @@
    void incrementDelayDeletionCount();
    
    void decrementDelayDeletionCount() throws Exception;
-
-   LargeMessageEncodingContext createNewContext();
 }

Modified: trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2009-11-06 20:38:14 UTC (rev 8247)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2009-11-07 00:33:24 UTC (rev 8248)
@@ -24,12 +24,13 @@
 import org.hornetq.core.buffers.ChannelBuffers;
 import org.hornetq.core.client.impl.ClientConsumerImpl;
 import org.hornetq.core.client.management.impl.ManagementHelper;
+import org.hornetq.core.exception.HornetQException;
 import org.hornetq.core.filter.Filter;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.management.ManagementService;
 import org.hornetq.core.management.Notification;
 import org.hornetq.core.management.NotificationType;
-import org.hornetq.core.message.LargeMessageEncodingContext;
+import org.hornetq.core.message.BodyEncoder;
 import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.postoffice.Binding;
 import org.hornetq.core.postoffice.QueueBinding;
@@ -632,7 +633,7 @@
       /** The current position on the message being processed */
       private long positionPendingLargeMessage;
 
-      private LargeMessageEncodingContext context;
+      private BodyEncoder context;
 
       public LargeMessageDeliverer(final LargeServerMessage message, final MessageReference ref) throws Exception
       {
@@ -672,7 +673,7 @@
                                                                                largeMessage.getLargeBodySize(),
                                                                                ref.getDeliveryCount());
 
-               context = largeMessage.createNewContext();
+               context = largeMessage.getBodyEncoder();
 
                context.open();
 
@@ -783,7 +784,7 @@
          }
       }
 
-      private SessionReceiveContinuationMessage createChunkSend(final LargeMessageEncodingContext context)
+      private SessionReceiveContinuationMessage createChunkSend(final BodyEncoder context) throws HornetQException
       {
          SessionReceiveContinuationMessage chunk;
 
@@ -793,8 +794,7 @@
 
          HornetQBuffer bodyBuffer = ChannelBuffers.buffer(localChunkLen);
 
-         // pendingLargeMessage.encodeBody(bodyBuffer, positionPendingLargeMessage, localChunkLen);
-         largeMessage.encodeBody(bodyBuffer, context, localChunkLen);
+         context.encode(bodyBuffer, localChunkLen);
 
          chunk = new SessionReceiveContinuationMessage(id,
                                                        bodyBuffer.array(),

Modified: trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java	2009-11-06 20:38:14 UTC (rev 8247)
+++ trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java	2009-11-07 00:33:24 UTC (rev 8248)
@@ -31,6 +31,8 @@
 import org.hornetq.core.config.Configuration;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.message.Message;
+import org.hornetq.core.persistence.impl.journal.FileLargeServerMessage;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
 import org.hornetq.core.remoting.spi.HornetQBuffer;
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.core.server.Queue;
@@ -2080,6 +2082,69 @@
       }
    }
 
+   // The ClientConsumer should be able to also send ServerLargeMessages as that's done by the CoreBridge
+   public void testSendServerMessage() throws Exception
+   {
+      HornetQServer server = createServer(true);
+      
+      server.start();
+      
+      ClientSessionFactory sf = createFactory(false);
+      
+      ClientSession session = sf.createSession(false, false);
+      
+      try
+      {
+         FileLargeServerMessage fileMessage = new FileLargeServerMessage((JournalStorageManager)server.getStorageManager());
+         
+         fileMessage.setMessageID(1005);
+         
+         for (int i = 0 ; i < LARGE_MESSAGE_SIZE; i++)
+         {
+            fileMessage.addBytes(new byte[]{getSamplebyte(i)});
+         }
+         
+         fileMessage.releaseResources();
+         
+         session.createQueue(ADDRESS, ADDRESS, true);
+         
+         ClientProducer prod = session.createProducer(ADDRESS);
+         
+         prod.send(fileMessage);
+         
+         fileMessage.deleteFile();
+         
+         session.commit();
+                  
+         session.start();
+         
+         ClientConsumer cons = session.createConsumer(ADDRESS);
+         
+         ClientMessage msg = cons.receive(5000);
+         
+         assertNotNull(msg);
+         
+         assertEquals(msg.getBodySize(), LARGE_MESSAGE_SIZE);
+         
+         for (int i = 0 ; i < LARGE_MESSAGE_SIZE; i++)
+         {
+            assertEquals(getSamplebyte(i), msg.getBody().readByte());
+         }
+         
+         msg.acknowledge();
+         
+         session.commit();
+         
+      }
+      finally
+      {
+         sf.close();
+         server.stop();
+      }
+   }
+
+   
+   
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java	2009-11-06 20:38:14 UTC (rev 8247)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java	2009-11-07 00:33:24 UTC (rev 8248)
@@ -59,8 +59,7 @@
       internaltestSimpleBridge(false, true);
    }
 
-   // Commented out by Clebert - I'm investigating this failure.. so I've set as disabled
-   public void disabled_testSimpleBridgeLargeMessageNullPersistence() throws Exception
+   public void testSimpleBridgeLargeMessageNullPersistence() throws Exception
    {
       internaltestSimpleBridge(true, false);
    }
@@ -234,8 +233,7 @@
       internalTestWithFilter(false, true);
    }
 
-   // Commented out by Clebert - I'm investigating this failure.. so I've set as disabled
-   public void disabled_testWithFilterLargeMessages() throws Exception
+   public void testWithFilterLargeMessages() throws Exception
    {
       internalTestWithFilter(true, false);
    }

Added: trunk/tests/src/org/hornetq/tests/integration/largemessage/ServerLargeMessageTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/largemessage/ServerLargeMessageTest.java	                        (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/largemessage/ServerLargeMessageTest.java	2009-11-07 00:33:24 UTC (rev 8248)
@@ -0,0 +1,116 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.largemessage;
+
+import org.hornetq.core.client.ClientConsumer;
+import org.hornetq.core.client.ClientMessage;
+import org.hornetq.core.client.ClientProducer;
+import org.hornetq.core.client.ClientSession;
+import org.hornetq.core.client.ClientSessionFactory;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
+import org.hornetq.core.persistence.impl.journal.FileLargeServerMessage;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.tests.util.ServiceTestBase;
+
+/**
+ * A ServerLargeMessageTest
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class ServerLargeMessageTest extends ServiceTestBase
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+   
+   // The ClientConsumer should be able to also send ServerLargeMessages as that's done by the CoreBridge
+   public void testSendServerMessage() throws Exception
+   {
+      HornetQServer server = createServer(true);
+      
+      server.start();
+      
+      ClientSessionFactory sf = createFactory(false);
+      
+      ClientSession session = sf.createSession(false, false);
+      
+      try
+      {
+         FileLargeServerMessage fileMessage = new FileLargeServerMessage((JournalStorageManager)server.getStorageManager());
+         
+         fileMessage.setMessageID(1005);
+         
+         for (int i = 0 ; i < 2 * ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE; i++)
+         {
+            fileMessage.addBytes(new byte[]{getSamplebyte(i)});
+         }
+         
+         fileMessage.releaseResources();
+         
+         session.createQueue("A", "A");
+         
+         ClientProducer prod = session.createProducer("A");
+         
+         prod.send(fileMessage);
+         
+         fileMessage.deleteFile();
+         
+         session.commit();
+                  
+         session.start();
+         
+         ClientConsumer cons = session.createConsumer("A");
+         
+         ClientMessage msg = cons.receive(5000);
+         
+         assertNotNull(msg);
+         
+         assertEquals(msg.getBodySize(), 2 * ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
+         
+         for (int i = 0 ; i < 2 * ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE; i++)
+         {
+            assertEquals(getSamplebyte(i), msg.getBody().readByte());
+         }
+         
+         msg.acknowledge();
+         
+         session.commit();
+         
+      }
+      finally
+      {
+         sf.close();
+         server.stop();
+      }
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Modified: trunk/tests/src/org/hornetq/tests/timing/core/journal/impl/AIOJournalImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/timing/core/journal/impl/AIOJournalImplTest.java	2009-11-06 20:38:14 UTC (rev 8247)
+++ trunk/tests/src/org/hornetq/tests/timing/core/journal/impl/AIOJournalImplTest.java	2009-11-07 00:33:24 UTC (rev 8248)
@@ -17,7 +17,6 @@
 
 import junit.framework.TestSuite;
 
-import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
 import org.hornetq.core.journal.SequentialFileFactory;
 import org.hornetq.core.journal.impl.AIOSequentialFileFactory;
 import org.hornetq.core.logging.Logger;
@@ -44,6 +43,7 @@
       super.setUp();
    }
 
+   @Override
    protected SequentialFileFactory getFileFactory() throws Exception
    {
       File file = new File(getTestDir());

Modified: trunk/tests/src/org/hornetq/tests/timing/core/journal/impl/FakeJournalImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/timing/core/journal/impl/FakeJournalImplTest.java	2009-11-06 20:38:14 UTC (rev 8247)
+++ trunk/tests/src/org/hornetq/tests/timing/core/journal/impl/FakeJournalImplTest.java	2009-11-07 00:33:24 UTC (rev 8248)
@@ -25,9 +25,9 @@
  */
 public class FakeJournalImplTest extends JournalImplTestUnit
 {
-	protected SequentialFileFactory getFileFactory() throws Exception
-	{
-		return new FakeSequentialFileFactory();
-	}
+   @Override
+   protected SequentialFileFactory getFileFactory() throws Exception
+   {
+      return new FakeSequentialFileFactory();
+   }
 }
-

Modified: trunk/tests/src/org/hornetq/tests/timing/core/journal/impl/JournalImplTestUnit.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/timing/core/journal/impl/JournalImplTestUnit.java	2009-11-06 20:38:14 UTC (rev 8247)
+++ trunk/tests/src/org/hornetq/tests/timing/core/journal/impl/JournalImplTestUnit.java	2009-11-07 00:33:24 UTC (rev 8248)
@@ -32,43 +32,44 @@
 public abstract class JournalImplTestUnit extends JournalImplTestBase
 {
    private static final Logger log = Logger.getLogger(JournalImplTestUnit.class);
-   
+
+   @Override
    protected void tearDown() throws Exception
    {
       super.tearDown();
-      
+
       assertEquals(0, AsynchronousFileImpl.getTotalMaxIO());
    }
-   
+
    public void testAddUpdateDeleteManyLargeFileSize() throws Exception
    {
       final int numberAdds = 10000;
-      
+
       final int numberUpdates = 5000;
-      
+
       final int numberDeletes = 3000;
-                  
+
       long[] adds = new long[numberAdds];
-      
+
       for (int i = 0; i < numberAdds; i++)
       {
          adds[i] = i;
       }
-      
+
       long[] updates = new long[numberUpdates];
-      
+
       for (int i = 0; i < numberUpdates; i++)
       {
          updates[i] = i;
       }
-      
+
       long[] deletes = new long[numberDeletes];
-      
+
       for (int i = 0; i < numberDeletes; i++)
       {
          deletes[i] = i;
       }
-      
+
       setup(10, 10 * 1024 * 1024, true);
       createJournal();
       startJournal();
@@ -80,38 +81,38 @@
       createJournal();
       startJournal();
       loadAndCheck();
-      
+
    }
-   
+
    public void testAddUpdateDeleteManySmallFileSize() throws Exception
    {
       final int numberAdds = 10000;
-      
+
       final int numberUpdates = 5000;
-      
+
       final int numberDeletes = 3000;
-                  
+
       long[] adds = new long[numberAdds];
-      
+
       for (int i = 0; i < numberAdds; i++)
       {
          adds[i] = i;
       }
-      
+
       long[] updates = new long[numberUpdates];
-      
+
       for (int i = 0; i < numberUpdates; i++)
       {
          updates[i] = i;
       }
-      
+
       long[] deletes = new long[numberDeletes];
-      
+
       for (int i = 0; i < numberDeletes; i++)
       {
          deletes[i] = i;
       }
-      
+
       setup(10, 10 * 1024, true);
       createJournal();
       startJournal();
@@ -124,57 +125,53 @@
       createJournal();
       startJournal();
       loadAndCheck();
-      
+
    }
-   
+
    public void testReclaimAndReload() throws Exception
    {
       setup(2, 10 * 1024 * 1024, false);
       createJournal();
       startJournal();
       load();
-      
+
       long start = System.currentTimeMillis();
-      
-                  
+
       byte[] record = generateRecord(recordLength);
-      
+
       int NUMBER_OF_RECORDS = 1000;
 
       for (int count = 0; count < NUMBER_OF_RECORDS; count++)
       {
          journal.appendAddRecord(count, (byte)0, record, false);
-         
+
          if (count >= NUMBER_OF_RECORDS / 2)
          {
             journal.appendDeleteRecord(count - NUMBER_OF_RECORDS / 2, false);
          }
-         
+
          if (count % 100 == 0)
          {
             log.debug("Done: " + count);
          }
       }
-      
+
       long end = System.currentTimeMillis();
-      
-      double rate = 1000 * ((double)NUMBER_OF_RECORDS) / (end - start);
-      
+
+      double rate = 1000 * (double)NUMBER_OF_RECORDS / (end - start);
+
       log.debug("Rate of " + rate + " adds/removes per sec");
-      
+
       log.debug("Reclaim status = " + debugJournal());
-               
+
       stopJournal();
       createJournal();
       startJournal();
       journal.load(new ArrayList<RecordInfo>(), new ArrayList<PreparedTransactionInfo>(), null);
-      
+
       assertEquals(NUMBER_OF_RECORDS / 2, journal.getIDMapSize());
-      
+
       stopJournal();
    }
-   
-   
-}
 
-
+}

Modified: trunk/tests/src/org/hornetq/tests/timing/core/journal/impl/NIOJournalImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/timing/core/journal/impl/NIOJournalImplTest.java	2009-11-06 20:38:14 UTC (rev 8247)
+++ trunk/tests/src/org/hornetq/tests/timing/core/journal/impl/NIOJournalImplTest.java	2009-11-07 00:33:24 UTC (rev 8248)
@@ -29,23 +29,22 @@
  */
 public class NIOJournalImplTest extends JournalImplTestUnit
 {
-	private static final Logger log = Logger.getLogger(NIOJournalImplTest.class);
-	
-	protected String journalDir = System.getProperty("user.home") + "/journal-test";
-		
-	protected SequentialFileFactory getFileFactory() throws Exception
-	{
-		File file = new File(journalDir);
-		
-		log.debug("deleting directory " + journalDir);
-		
-		deleteDirectory(file);
-		
-		file.mkdir();		
-		
-		return new NIOSequentialFileFactory(journalDir);
-	}
-	
-	
-}
+   private static final Logger log = Logger.getLogger(NIOJournalImplTest.class);
 
+   protected String journalDir = System.getProperty("user.home") + "/journal-test";
+
+   @Override
+   protected SequentialFileFactory getFileFactory() throws Exception
+   {
+      File file = new File(journalDir);
+
+      log.debug("deleting directory " + journalDir);
+
+      deleteDirectory(file);
+
+      file.mkdir();
+
+      return new NIOSequentialFileFactory(journalDir);
+   }
+
+}

Modified: trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java	2009-11-06 20:38:14 UTC (rev 8247)
+++ trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java	2009-11-07 00:33:24 UTC (rev 8248)
@@ -22,7 +22,7 @@
 
 import org.hornetq.core.exception.HornetQException;
 import org.hornetq.core.filter.Filter;
-import org.hornetq.core.message.LargeMessageEncodingContext;
+import org.hornetq.core.message.BodyEncoder;
 import org.hornetq.core.message.PropertyConversionException;
 import org.hornetq.core.paging.PagingStore;
 import org.hornetq.core.postoffice.Binding;
@@ -458,7 +458,7 @@
 
       }
 
-      public void encodeBody(HornetQBuffer bufferOut, LargeMessageEncodingContext context, int size)
+      public void encodeBody(HornetQBuffer bufferOut, BodyEncoder context, int size)
       {
          // To change body of implemented methods use File | Settings | File Templates.
       }
@@ -1116,6 +1116,15 @@
          return false;
       }
 
+      /* (non-Javadoc)
+       * @see org.hornetq.core.message.Message#getBodyEncoder()
+       */
+      public BodyEncoder getBodyEncoder()
+      {
+         // TODO Auto-generated method stub
+         return null;
+      }
+
    }
 
    class FakeFilter implements Filter



More information about the hornetq-commits mailing list