[hornetq-commits] JBoss hornetq SVN: r8078 - in branches/Replication_Clebert: src/main/org/hornetq/core/persistence/impl/journal and 7 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Sat Oct 10 19:35:45 EDT 2009


Author: clebert.suconic at jboss.com
Date: 2009-10-10 19:35:44 -0400 (Sat, 10 Oct 2009)
New Revision: 8078

Added:
   branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationLargeMessageBeingMessage.java
   branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationLargeMessageWriteMessage.java
   branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationLargemessageEndMessage.java
Modified:
   branches/Replication_Clebert/src/main/org/hornetq/core/persistence/StorageManager.java
   branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/journal/JournalLargeServerMessage.java
   branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
   branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java
   branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java
   branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddMessage.java
   branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddTXMessage.java
   branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationCommitMessage.java
   branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationDeleteMessage.java
   branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationDeleteTXMessage.java
   branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationPrepareMessage.java
   branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationResponseMessage.java
   branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java
   branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
   branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
   branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
   branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
Log:
Replication on large message

Modified: branches/Replication_Clebert/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/persistence/StorageManager.java	2009-10-09 20:24:02 UTC (rev 8077)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/persistence/StorageManager.java	2009-10-10 23:35:44 UTC (rev 8078)
@@ -99,6 +99,8 @@
 
    LargeServerMessage createLargeMessage();
 
+   LargeServerMessage createLargeMessage(byte [] header);
+
    void prepare(long txID, Xid xid) throws Exception;
 
    void commit(long txID) throws Exception;

Modified: branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/journal/JournalLargeServerMessage.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/journal/JournalLargeServerMessage.java	2009-10-09 20:24:02 UTC (rev 8077)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/journal/JournalLargeServerMessage.java	2009-10-10 23:35:44 UTC (rev 8078)
@@ -17,6 +17,8 @@
 
 import java.nio.ByteBuffer;
 
+import com.sun.org.apache.bcel.internal.generic.StoreInstruction;
+
 import org.hornetq.core.journal.SequentialFile;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.remoting.spi.HornetQBuffer;
@@ -91,11 +93,9 @@
       {
          file.open();
       }
+      
+      storageManager.addBytesToLargeMessage(file, this.getMessageID(), bytes);
 
-      file.position(file.size());
-
-      file.write(ByteBuffer.wrap(bytes), false);
-
       bodySize += bytes.length;
    }
 
@@ -232,6 +232,7 @@
    public synchronized void deleteFile() throws Exception
    {
       validateFile();
+      releaseResources();
       storageManager.deleteFile(file);
    }
    

Modified: branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2009-10-09 20:24:02 UTC (rev 8077)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2009-10-10 23:35:44 UTC (rev 8078)
@@ -18,6 +18,7 @@
 import static org.hornetq.utils.DataConstants.SIZE_LONG;
 
 import java.io.File;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -376,7 +377,35 @@
    {
       return new JournalLargeServerMessage(this);
    }
+   
+   public void addBytesToLargeMessage(SequentialFile file, long messageId, final byte[] bytes) throws Exception
+   {
+      file.position(file.size());
 
+      file.write(ByteBuffer.wrap(bytes), false);
+      
+      if (isReplicated())
+      {
+         this.replicator.largeMessageWrite(messageId, bytes);
+      }
+   }
+   
+   public LargeServerMessage createLargeMessage(byte [] header)
+   {
+      if (isReplicated())
+      {
+         replicator.largeMessageBegin(header);
+      }
+      
+      JournalLargeServerMessage largeMessage = new JournalLargeServerMessage(this);
+
+      HornetQBuffer headerBuffer = ChannelBuffers.wrappedBuffer(header);
+
+      largeMessage.decodeProperties(headerBuffer);
+      
+      return largeMessage;     
+   }
+
    // Non transactional operations
 
    public void storeMessage(final ServerMessage message) throws Exception

Modified: branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java	2009-10-09 20:24:02 UTC (rev 8077)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java	2009-10-10 23:35:44 UTC (rev 8078)
@@ -19,6 +19,7 @@
 
 import javax.transaction.xa.Xid;
 
+import org.hornetq.core.buffers.ChannelBuffers;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.paging.PageTransactionInfo;
 import org.hornetq.core.paging.PagedMessage;
@@ -26,6 +27,7 @@
 import org.hornetq.core.persistence.QueueBindingInfo;
 import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.postoffice.Binding;
+import org.hornetq.core.remoting.spi.HornetQBuffer;
 import org.hornetq.core.server.LargeServerMessage;
 import org.hornetq.core.server.MessageReference;
 import org.hornetq.core.server.Queue;
@@ -174,6 +176,18 @@
       return new NullStorageLargeServerMessage();
    }
    
+   public LargeServerMessage createLargeMessage(byte [] header)
+   {
+      NullStorageLargeServerMessage largeMessage = new NullStorageLargeServerMessage();
+
+      HornetQBuffer headerBuffer = ChannelBuffers.wrappedBuffer(header);
+
+      largeMessage.decodeProperties(headerBuffer);
+      
+      return largeMessage;     
+   }
+   
+   
    public long generateUniqueID()
    {
       long id = idSequence.getAndIncrement();

Modified: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java	2009-10-09 20:24:02 UTC (rev 8077)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java	2009-10-10 23:35:44 UTC (rev 8078)
@@ -13,6 +13,9 @@
 
 package org.hornetq.core.remoting.impl;
 
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_LARGE_MESSAGE_BEGIN;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_LARGE_MESSAGE_END;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_LARGE_MESSAGE_WRITE;
 import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_PAGE_EVENT;
 import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_PAGE_WRITE;
 import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_PREPARE;
@@ -87,6 +90,9 @@
 import org.hornetq.core.remoting.impl.wireformat.ReplicationCommitMessage;
 import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteMessage;
 import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteTXMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationLargeMessageBeingMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationLargeMessageWriteMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationLargemessageEndMessage;
 import org.hornetq.core.remoting.impl.wireformat.ReplicationPageEventMessage;
 import org.hornetq.core.remoting.impl.wireformat.ReplicationPageWriteMessage;
 import org.hornetq.core.remoting.impl.wireformat.ReplicationPrepareMessage;
@@ -420,6 +426,21 @@
             packet = new ReplicationPageEventMessage();
             break;
          }
+         case REPLICATION_LARGE_MESSAGE_BEGIN:
+         {
+            packet = new ReplicationLargeMessageBeingMessage();
+            break;
+         }
+         case REPLICATION_LARGE_MESSAGE_END:
+         {
+            packet = new ReplicationLargemessageEndMessage();
+            break;
+         }
+         case REPLICATION_LARGE_MESSAGE_WRITE:
+         {
+            packet = new ReplicationLargeMessageWriteMessage();
+            break;
+         }
          default:
          {
             throw new IllegalArgumentException("Invalid type: " + packetType);

Modified: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java	2009-10-09 20:24:02 UTC (rev 8077)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java	2009-10-10 23:35:44 UTC (rev 8078)
@@ -159,7 +159,13 @@
 
    public static final byte REPLICATION_PAGE_EVENT = 88;
    
+   public static final byte REPLICATION_LARGE_MESSAGE_BEGIN = 89;
    
+   public static final byte REPLICATION_LARGE_MESSAGE_END = 90;
+   
+   public static final byte REPLICATION_LARGE_MESSAGE_WRITE = 91;
+   
+   
 
    // Static --------------------------------------------------------
 

Modified: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddMessage.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddMessage.java	2009-10-09 20:24:02 UTC (rev 8077)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddMessage.java	2009-10-10 23:35:44 UTC (rev 8078)
@@ -35,7 +35,7 @@
 
    /** 0 - Bindings, 1 - MessagesJournal */
    private byte journalID;
-   
+
    private boolean isUpdate;
 
    private byte recordType;
@@ -53,7 +53,11 @@
       super(REPLICATION_APPEND);
    }
 
-   public ReplicationAddMessage(byte journalID, boolean isUpdate, long id, byte recordType, EncodingSupport encodingData)
+   public ReplicationAddMessage(final byte journalID,
+                                final boolean isUpdate,
+                                final long id,
+                                final byte recordType,
+                                final EncodingSupport encodingData)
    {
       this();
       this.journalID = journalID;
@@ -65,10 +69,10 @@
 
    // Public --------------------------------------------------------
 
+   @Override
    public int getRequiredBufferSize()
    {
-      return BASIC_PACKET_SIZE + 
-             DataConstants.SIZE_BYTE +
+      return BASIC_PACKET_SIZE + DataConstants.SIZE_BYTE +
              DataConstants.SIZE_BOOLEAN +
              DataConstants.SIZE_LONG +
              DataConstants.SIZE_BYTE +
@@ -115,7 +119,7 @@
    {
       return journalID;
    }
-   
+
    public boolean isUpdate()
    {
       return isUpdate;

Modified: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddTXMessage.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddTXMessage.java	2009-10-09 20:24:02 UTC (rev 8077)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddTXMessage.java	2009-10-10 23:35:44 UTC (rev 8078)
@@ -32,12 +32,12 @@
    // Attributes ----------------------------------------------------
 
    private long txId;
-   
+
    private long id;
 
    /** 0 - Bindings, 1 - MessagesJournal */
    private byte journalID;
-   
+
    private boolean isUpdate;
 
    private byte recordType;
@@ -55,7 +55,12 @@
       super(REPLICATION_APPEND_TX);
    }
 
-   public ReplicationAddTXMessage(byte journalID, boolean isUpdate, long txId, long id, byte recordType, EncodingSupport encodingData)
+   public ReplicationAddTXMessage(final byte journalID,
+                                  final boolean isUpdate,
+                                  final long txId,
+                                  final long id,
+                                  final byte recordType,
+                                  final EncodingSupport encodingData)
    {
       this();
       this.journalID = journalID;
@@ -68,10 +73,10 @@
 
    // Public --------------------------------------------------------
 
+   @Override
    public int getRequiredBufferSize()
    {
-      return BASIC_PACKET_SIZE + 
-             DataConstants.SIZE_BYTE +
+      return BASIC_PACKET_SIZE + DataConstants.SIZE_BYTE +
              DataConstants.SIZE_BOOLEAN +
              DataConstants.SIZE_LONG +
              DataConstants.SIZE_LONG +
@@ -113,7 +118,7 @@
    {
       return id;
    }
-   
+
    public long getTxId()
    {
       return txId;
@@ -126,7 +131,7 @@
    {
       return journalID;
    }
-   
+
    public boolean isUpdate()
    {
       return isUpdate;

Modified: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationCommitMessage.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationCommitMessage.java	2009-10-09 20:24:02 UTC (rev 8077)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationCommitMessage.java	2009-10-10 23:35:44 UTC (rev 8078)
@@ -46,7 +46,7 @@
       super(REPLICATION_COMMIT_ROLLBACK);
    }
 
-   public ReplicationCommitMessage(byte journalID, boolean rollback, long txId)
+   public ReplicationCommitMessage(final byte journalID, final boolean rollback, final long txId)
    {
       this();
       this.journalID = journalID;
@@ -56,6 +56,7 @@
 
    // Public --------------------------------------------------------
 
+   @Override
    public int getRequiredBufferSize()
    {
       return BASIC_PACKET_SIZE + DataConstants.SIZE_BYTE + DataConstants.SIZE_BOOLEAN + DataConstants.SIZE_LONG;

Modified: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationDeleteMessage.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationDeleteMessage.java	2009-10-09 20:24:02 UTC (rev 8077)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationDeleteMessage.java	2009-10-10 23:35:44 UTC (rev 8078)
@@ -13,7 +13,6 @@
 
 package org.hornetq.core.remoting.impl.wireformat;
 
-import org.hornetq.core.journal.EncodingSupport;
 import org.hornetq.core.remoting.spi.HornetQBuffer;
 import org.hornetq.utils.DataConstants;
 
@@ -45,7 +44,7 @@
       super(REPLICATION_DELETE);
    }
 
-   public ReplicationDeleteMessage(byte journalID, long id)
+   public ReplicationDeleteMessage(final byte journalID, final long id)
    {
       this();
       this.journalID = journalID;
@@ -54,11 +53,10 @@
 
    // Public --------------------------------------------------------
 
+   @Override
    public int getRequiredBufferSize()
    {
-      return BASIC_PACKET_SIZE + 
-             DataConstants.SIZE_BYTE +
-             DataConstants.SIZE_LONG;
+      return BASIC_PACKET_SIZE + DataConstants.SIZE_BYTE + DataConstants.SIZE_LONG;
 
    }
 
@@ -92,7 +90,6 @@
       return journalID;
    }
 
-
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationDeleteTXMessage.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationDeleteTXMessage.java	2009-10-09 20:24:02 UTC (rev 8077)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationDeleteTXMessage.java	2009-10-10 23:35:44 UTC (rev 8078)
@@ -32,7 +32,7 @@
    // Attributes ----------------------------------------------------
 
    private long txId;
-   
+
    private long id;
 
    /** 0 - Bindings, 1 - MessagesJournal */
@@ -51,7 +51,10 @@
       super(REPLICATION_DELETE_TX);
    }
 
-   public ReplicationDeleteTXMessage(byte journalID, long txId, long id, EncodingSupport encodingData)
+   public ReplicationDeleteTXMessage(final byte journalID,
+                                     final long txId,
+                                     final long id,
+                                     final EncodingSupport encodingData)
    {
       this();
       this.journalID = journalID;
@@ -62,10 +65,10 @@
 
    // Public --------------------------------------------------------
 
+   @Override
    public int getRequiredBufferSize()
    {
-      return BASIC_PACKET_SIZE + 
-             DataConstants.SIZE_BYTE +
+      return BASIC_PACKET_SIZE + DataConstants.SIZE_BYTE +
              DataConstants.SIZE_LONG +
              DataConstants.SIZE_LONG +
              DataConstants.SIZE_INT +
@@ -101,7 +104,7 @@
    {
       return id;
    }
-   
+
    public long getTxId()
    {
       return txId;
@@ -114,7 +117,7 @@
    {
       return journalID;
    }
-   
+
    /**
     * @return the recordData
     */

Added: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationLargeMessageBeingMessage.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationLargeMessageBeingMessage.java	                        (rev 0)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationLargeMessageBeingMessage.java	2009-10-10 23:35:44 UTC (rev 8078)
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.remoting.impl.wireformat;
+
+import org.hornetq.core.remoting.spi.HornetQBuffer;
+import org.hornetq.utils.DataConstants;
+
+/**
+ * A ReplicationLargeMessageBeingMessage
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class ReplicationLargeMessageBeingMessage extends PacketImpl
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   byte header[];
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public ReplicationLargeMessageBeingMessage(final byte[] header)
+   {
+      this();
+      this.header = header;
+   }
+
+   public ReplicationLargeMessageBeingMessage()
+   {
+      super(REPLICATION_LARGE_MESSAGE_BEGIN);
+   }
+
+   // Public --------------------------------------------------------
+
+   @Override
+   public int getRequiredBufferSize()
+   {
+      return BASIC_PACKET_SIZE + DataConstants.SIZE_INT + header.length;
+   }
+
+   @Override
+   public void encodeBody(final HornetQBuffer buffer)
+   {
+      buffer.writeInt(header.length);
+      buffer.writeBytes(header);
+   }
+
+   @Override
+   public void decodeBody(final HornetQBuffer buffer)
+   {
+      int size = buffer.readInt();
+      header = new byte[size];
+      buffer.readBytes(header);
+   }
+
+   /**
+    * @return the header
+    */
+   public byte[] getHeader()
+   {
+      return header;
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Added: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationLargeMessageWriteMessage.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationLargeMessageWriteMessage.java	                        (rev 0)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationLargeMessageWriteMessage.java	2009-10-10 23:35:44 UTC (rev 8078)
@@ -0,0 +1,105 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.remoting.impl.wireformat;
+
+import org.hornetq.core.remoting.spi.HornetQBuffer;
+import org.hornetq.utils.DataConstants;
+
+/**
+ * A ReplicationLargeMessageWriteMessage
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class ReplicationLargeMessageWriteMessage extends PacketImpl
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private long messageId;
+
+   private byte body[];
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+   public ReplicationLargeMessageWriteMessage()
+   {
+      super(REPLICATION_LARGE_MESSAGE_WRITE);
+   }
+
+   /**
+    * @param messageId
+    * @param body
+    */
+   public ReplicationLargeMessageWriteMessage(final long messageId, final byte[] body)
+   {
+      this();
+
+      this.messageId = messageId;
+      this.body = body;
+   }
+
+   // Public --------------------------------------------------------
+   @Override
+   public int getRequiredBufferSize()
+   {
+      return BASIC_PACKET_SIZE + DataConstants.SIZE_LONG + DataConstants.SIZE_INT + body.length;
+   }
+
+   @Override
+   public void encodeBody(final HornetQBuffer buffer)
+   {
+      buffer.writeLong(messageId);
+      buffer.writeInt(body.length);
+      buffer.writeBytes(body);
+   }
+
+   @Override
+   public void decodeBody(final HornetQBuffer buffer)
+   {
+      messageId = buffer.readLong();
+      int size = buffer.readInt();
+      body = new byte[size];
+      buffer.readBytes(body);
+   }
+
+   /**
+    * @return the messageId
+    */
+   public long getMessageId()
+   {
+      return messageId;
+   }
+
+   /**
+    * @return the body
+    */
+   public byte[] getBody()
+   {
+      return body;
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Added: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationLargemessageEndMessage.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationLargemessageEndMessage.java	                        (rev 0)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationLargemessageEndMessage.java	2009-10-10 23:35:44 UTC (rev 8078)
@@ -0,0 +1,98 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.remoting.impl.wireformat;
+
+import org.hornetq.core.remoting.spi.HornetQBuffer;
+import org.hornetq.utils.DataConstants;
+
+/**
+ * A ReplicationLargemessageEndMessage
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class ReplicationLargemessageEndMessage extends PacketImpl
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   long messageId;
+
+   boolean isDelete;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public ReplicationLargemessageEndMessage()
+   {
+      super(REPLICATION_LARGE_MESSAGE_END);
+   }
+
+   public ReplicationLargemessageEndMessage(final long messageId, final boolean isDelete)
+   {
+      this();
+      this.messageId = messageId;
+      this.isDelete = isDelete;
+   }
+
+   // Public --------------------------------------------------------
+
+   @Override
+   public int getRequiredBufferSize()
+   {
+      return BASIC_PACKET_SIZE + DataConstants.SIZE_LONG + DataConstants.SIZE_BOOLEAN;
+   }
+
+   @Override
+   public void encodeBody(final HornetQBuffer buffer)
+   {
+      buffer.writeLong(messageId);
+      buffer.writeBoolean(isDelete);
+   }
+
+   @Override
+   public void decodeBody(final HornetQBuffer buffer)
+   {
+      messageId = buffer.readLong();
+      isDelete = buffer.readBoolean();
+   }
+
+   /**
+    * @return the messageId
+    */
+   public long getMessageId()
+   {
+      return messageId;
+   }
+
+   /**
+    * @return the isDelete
+    */
+   public boolean isDelete()
+   {
+      return isDelete;
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Modified: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationPrepareMessage.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationPrepareMessage.java	2009-10-09 20:24:02 UTC (rev 8077)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationPrepareMessage.java	2009-10-10 23:35:44 UTC (rev 8078)
@@ -49,7 +49,7 @@
       super(REPLICATION_PREPARE);
    }
 
-   public ReplicationPrepareMessage(byte journalID, long txId, EncodingSupport encodingData)
+   public ReplicationPrepareMessage(final byte journalID, final long txId, final EncodingSupport encodingData)
    {
       this();
       this.journalID = journalID;
@@ -59,10 +59,10 @@
 
    // Public --------------------------------------------------------
 
+   @Override
    public int getRequiredBufferSize()
    {
-      return BASIC_PACKET_SIZE + 
-             DataConstants.SIZE_BYTE +
+      return BASIC_PACKET_SIZE + DataConstants.SIZE_BYTE +
              DataConstants.SIZE_LONG +
              DataConstants.SIZE_INT +
              (encodingData != null ? encodingData.getEncodeSize() : recordData.length);
@@ -100,7 +100,7 @@
    {
       return journalID;
    }
-   
+
    /**
     * @return the recordData
     */

Modified: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationResponseMessage.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationResponseMessage.java	2009-10-09 20:24:02 UTC (rev 8077)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationResponseMessage.java	2009-10-10 23:35:44 UTC (rev 8078)
@@ -13,7 +13,6 @@
 
 package org.hornetq.core.remoting.impl.wireformat;
 
-
 /**
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @version <tt>$Revision$</tt>
@@ -32,18 +31,17 @@
    {
       super(REPLICATION_RESPONSE);
    }
-   
+
    // Public --------------------------------------------------------
 
    /* (non-Javadoc)
     * @see org.hornetq.core.remoting.Packet#getRequiredBufferSize()
     */
+   @Override
    public int getRequiredBufferSize()
    {
       return BASIC_PACKET_SIZE;
    }
-   
-   
 
    // Package protected ---------------------------------------------
 

Modified: branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java	2009-10-09 20:24:02 UTC (rev 8077)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java	2009-10-10 23:35:44 UTC (rev 8078)
@@ -73,5 +73,13 @@
     * @param pageNumber
     */
    void pageWrite(PagedMessage message, int pageNumber);
+   
+   void largeMessageBegin(byte [] header);
+   
+   void largeMessageWrite(long messageId, byte [] body);
+   
+   void largeMessageEnd(long messageId);
+   
+   void largeMessageDelete(long messageId);
 
 }

Modified: branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java	2009-10-09 20:24:02 UTC (rev 8077)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java	2009-10-10 23:35:44 UTC (rev 8078)
@@ -13,6 +13,10 @@
 
 package org.hornetq.core.replication.impl;
 
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_LARGE_MESSAGE_BEGIN;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_LARGE_MESSAGE_END;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_LARGE_MESSAGE_WRITE;
+
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
@@ -33,12 +37,16 @@
 import org.hornetq.core.remoting.impl.wireformat.ReplicationCommitMessage;
 import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteMessage;
 import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteTXMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationLargeMessageBeingMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationLargeMessageWriteMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationLargemessageEndMessage;
 import org.hornetq.core.remoting.impl.wireformat.ReplicationPageEventMessage;
 import org.hornetq.core.remoting.impl.wireformat.ReplicationPageWriteMessage;
 import org.hornetq.core.remoting.impl.wireformat.ReplicationPrepareMessage;
 import org.hornetq.core.remoting.impl.wireformat.ReplicationResponseMessage;
 import org.hornetq.core.replication.ReplicationEndpoint;
 import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.LargeServerMessage;
 import org.hornetq.core.server.ServerMessage;
 import org.hornetq.utils.SimpleString;
 
@@ -72,6 +80,8 @@
    private PagingManager pageManager;
 
    private final ConcurrentMap<SimpleString, ConcurrentMap<Integer, Page>> pageIndex = new ConcurrentHashMap<SimpleString, ConcurrentMap<Integer, Page>>();
+   
+   private final ConcurrentMap<Long, LargeServerMessage> largeMessages = new ConcurrentHashMap<Long, LargeServerMessage>();
 
    // Constructors --------------------------------------------------
    public ReplicationEndpointImpl(final HornetQServer server)
@@ -120,7 +130,22 @@
          {
             handlePageEvent((ReplicationPageEventMessage)packet);
          }
-
+         else if (packet.getType() == REPLICATION_LARGE_MESSAGE_BEGIN)
+         {
+            handleLargeMessageBegin((ReplicationLargeMessageBeingMessage)packet);
+         }
+         else if (packet.getType() == REPLICATION_LARGE_MESSAGE_WRITE)
+         {
+            handleLargeMessageWrite((ReplicationLargeMessageWriteMessage)packet);
+         }
+         else if (packet.getType() == REPLICATION_LARGE_MESSAGE_END)
+         {
+            handleLargeMessageEnd((ReplicationLargemessageEndMessage)packet);
+         }
+         else
+         {
+            log.warn("Packet " + packet + " can't be processed by the ReplicationEndpoint");
+         }
       }
       catch (Exception e)
       {
@@ -171,6 +196,31 @@
    {
       channel.close();
       storage.stop();
+      
+      for (ConcurrentMap<Integer, Page> map : pageIndex.values())
+      {
+         for (Page page : map.values())
+         {
+            try
+            {
+               page.close();
+            }
+            catch (Exception e)
+            {
+               log.warn("Error while closing the page on backup", e);
+            }
+         }
+      }
+      
+      pageIndex.clear();
+      
+      
+      for (LargeServerMessage largeMessage : largeMessages.values())
+      {
+         largeMessage.releaseResources();
+      }
+      
+      largeMessages.clear();
    }
 
    /* (non-Javadoc)
@@ -194,10 +244,87 @@
    // Protected -----------------------------------------------------
 
    // Private -------------------------------------------------------
+   /**
+    * @param packet
+    */
+   private void handleLargeMessageEnd(ReplicationLargemessageEndMessage packet)
+   {
+      LargeServerMessage message = lookupLargeMessage(packet.getMessageId(), packet.isDelete());
+      if (message != null)
+      {
+         if (packet.isDelete())
+         {
+            try
+            {
+               message.deleteFile();
+            }
+            catch (Exception e)
+            {
+               log.warn("Error deleting large message ID = " + packet.getMessageId(), e);
+            }
+         }
+         else
+         {
+            try
+            {
+               message.setStored();
+            }
+            catch (Exception e)
+            {
+               log.warn("Error deleting large message ID = " + packet.getMessageId(), e);
+            }
+         }
+      }
+   }
 
    /**
     * @param packet
     */
+   private void handleLargeMessageWrite(ReplicationLargeMessageWriteMessage packet) throws Exception
+   {
+      LargeServerMessage message = lookupLargeMessage(packet.getMessageId(), false);
+      if (message != null)
+      {
+         message.addBytes(packet.getBody());
+      }
+   }
+   
+   
+   private LargeServerMessage lookupLargeMessage(long messageId, boolean isDelete)
+   {
+      LargeServerMessage message;
+      
+      if (isDelete)
+      {
+         message = largeMessages.remove(messageId);
+      }
+      else
+      {
+         message = largeMessages.get(messageId);
+      }
+      
+      if (message == null)
+      {
+         log.warn("Large MessageID " + messageId + "  is not available on backup server. Ignoring replication message");
+      }
+      
+      return message;
+
+   }
+
+   /**
+    * @param packet
+    */
+   private void handleLargeMessageBegin(ReplicationLargeMessageBeingMessage packet)
+   {
+      LargeServerMessage largeMessage = storage.createLargeMessage(packet.getHeader());
+      this.largeMessages.put(largeMessage.getMessageID(), largeMessage);
+   }
+
+
+   /**
+    * @param packet
+    */
    private void handleCommitRollback(final ReplicationCommitMessage packet) throws Exception
    {
       Journal journalToUse = getJournal(packet.getJournalID());

Modified: branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2009-10-09 20:24:02 UTC (rev 8077)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2009-10-10 23:35:44 UTC (rev 8078)
@@ -33,6 +33,9 @@
 import org.hornetq.core.remoting.impl.wireformat.ReplicationCommitMessage;
 import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteMessage;
 import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteTXMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationLargeMessageBeingMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationLargeMessageWriteMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationLargemessageEndMessage;
 import org.hornetq.core.remoting.impl.wireformat.ReplicationPageEventMessage;
 import org.hornetq.core.remoting.impl.wireformat.ReplicationPageWriteMessage;
 import org.hornetq.core.remoting.impl.wireformat.ReplicationPrepareMessage;
@@ -252,8 +255,54 @@
          sendReplicatePacket(new ReplicationPageWriteMessage(message, pageNumber));
       }
    }
+   
+   /* (non-Javadoc)
+    * @see org.hornetq.core.replication.ReplicationManager#largeMessageBegin(byte[])
+    */
+   public void largeMessageBegin(byte[] header)
+   {
+      if (enabled)
+      {
+         sendReplicatePacket(new ReplicationLargeMessageBeingMessage(header));
+      }
+   }
 
    /* (non-Javadoc)
+    * @see org.hornetq.core.replication.ReplicationManager#largeMessageDelete(long)
+    */
+   public void largeMessageDelete(long messageId)
+   {
+      if (enabled)
+      {
+         sendReplicatePacket(new ReplicationLargemessageEndMessage(messageId, true));
+      }
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.replication.ReplicationManager#largeMessageEnd(long)
+    */
+   public void largeMessageEnd(long messageId)
+   {
+      if (enabled)
+      {
+         sendReplicatePacket(new ReplicationLargemessageEndMessage(messageId, false));
+      }
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.replication.ReplicationManager#largeMessageWrite(long, byte[])
+    */
+   public void largeMessageWrite(long messageId, byte[] body)
+   {
+      if (enabled)
+      {
+         sendReplicatePacket(new ReplicationLargeMessageWriteMessage(messageId, body));
+      }
+   }
+
+   
+
+   /* (non-Javadoc)
     * @see org.hornetq.core.server.HornetQComponent#isStarted()
     */
    public synchronized boolean isStarted()

Modified: branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2009-10-09 20:24:02 UTC (rev 8077)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2009-10-10 23:35:44 UTC (rev 8078)
@@ -164,7 +164,6 @@
    private final SimpleString nodeID;
 
    // The current currentLargeMessage being processed
-   // In case of replication, currentLargeMessage should only be accessed within the replication callbacks
    private volatile LargeServerMessage currentLargeMessage;
 
    private ServerSessionPacketHandler handler;
@@ -1688,13 +1687,7 @@
 
    private LargeServerMessage createLargeMessageStorage(final byte[] header) throws Exception
    {
-      LargeServerMessage largeMessage = storageManager.createLargeMessage();
-
-      HornetQBuffer headerBuffer = ChannelBuffers.wrappedBuffer(header);
-
-      largeMessage.decodeProperties(headerBuffer);
-
-      return largeMessage;
+      return storageManager.createLargeMessage(header);
    }
 
    private void doRollback(final boolean lastMessageAsDelived, final Transaction theTx) throws Exception

Modified: branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java	2009-10-09 20:24:02 UTC (rev 8077)
+++ branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java	2009-10-10 23:35:44 UTC (rev 8078)
@@ -67,6 +67,7 @@
 import org.hornetq.utils.ExecutorFactory;
 import org.hornetq.utils.HornetQThreadFactory;
 import org.hornetq.utils.SimpleString;
+import org.jboss.netty.channel.Channels;
 
 /**
  * A ReplicationTest
@@ -231,8 +232,23 @@
          manager.pageDeleted(dummy, 4);
          manager.pageDeleted(dummy, 5);
          manager.pageDeleted(dummy, 6);
+
+         blockOnReplication(manager);
          
+         ServerMessageImpl serverMsg = new ServerMessageImpl();
+         serverMsg.setMessageID(500);
+         serverMsg.setDestination(new SimpleString("tttt"));
+         
+         
+         HornetQBuffer buffer = ChannelBuffers.dynamicBuffer(100);
+         serverMsg.encodeProperties(buffer);
+         
+         manager.largeMessageBegin(buffer.array());
 
+         manager.largeMessageWrite(500, new byte[1024]);
+         
+         manager.largeMessageEnd(500);
+         
          blockOnReplication(manager);
          
          store.start();



More information about the hornetq-commits mailing list