[hornetq-commits] JBoss hornetq SVN: r8030 - in branches/Replication_Clebert: src/main/org/hornetq/core/journal/impl and 5 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Oct 1 23:39:16 EDT 2009


Author: clebert.suconic at jboss.com
Date: 2009-10-01 23:39:15 -0400 (Thu, 01 Oct 2009)
New Revision: 8030

Added:
   branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddTXMessage.java
   branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationCommitMessage.java
   branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationDeleteTXMessage.java
   branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationPrepareMessage.java
Modified:
   branches/Replication_Clebert/src/main/org/hornetq/core/journal/Journal.java
   branches/Replication_Clebert/src/main/org/hornetq/core/journal/impl/JournalImpl.java
   branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java
   branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java
   branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddMessage.java
   branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationDeleteMessage.java
   branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java
   branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
   branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
   branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
Log:
Changes

Modified: branches/Replication_Clebert/src/main/org/hornetq/core/journal/Journal.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/journal/Journal.java	2009-10-01 22:37:37 UTC (rev 8029)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/journal/Journal.java	2009-10-02 03:39:15 UTC (rev 8030)
@@ -71,6 +71,8 @@
     */
    void appendPrepareRecord(long txID, EncodingSupport transactionData, boolean sync) throws Exception;
 
+   void appendPrepareRecord(long txID, byte[] transactionData, boolean sync) throws Exception;
+
    void appendRollbackRecord(long txID, boolean sync) throws Exception;
 
    // Load

Modified: branches/Replication_Clebert/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/journal/impl/JournalImpl.java	2009-10-01 22:37:37 UTC (rev 8029)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/journal/impl/JournalImpl.java	2009-10-02 03:39:15 UTC (rev 8030)
@@ -1163,7 +1163,17 @@
    {
       appendDeleteRecordTransactional(txID, id, NullEncoding.instance);
    }
+   
+   /* (non-Javadoc)
+    * @see org.hornetq.core.journal.Journal#appendPrepareRecord(long, byte[], boolean)
+    */
+   public void appendPrepareRecord(long txID, byte[] transactionData, boolean sync) throws Exception
+   {
+      appendPrepareRecord(txID, new ByteArrayEncoding(transactionData), sync);
+   }
 
+
+
    /** 
     * 
     * <p>If the system crashed after a prepare was called, it should store information that is required to bring the transaction 
@@ -3342,7 +3352,12 @@
    private static class NullEncoding implements EncodingSupport
    {
 
-      static NullEncoding instance = new NullEncoding();
+      private static NullEncoding instance = new NullEncoding();
+      
+      public static NullEncoding getInstance()
+      {
+         return instance;
+      }
 
       public void decode(final HornetQBuffer buffer)
       {

Modified: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java	2009-10-01 22:37:37 UTC (rev 8029)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java	2009-10-02 03:39:15 UTC (rev 8030)
@@ -12,7 +12,11 @@
  */
 
 package org.hornetq.core.remoting.impl;
-
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_PREPARE;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_DELETE_TX;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_COMMIT_ROLLBACK;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_APPEND_TX;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_DELETE;
 import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_RESPONSE;
 import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_APPEND;
 import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.CREATE_REPLICATION;
@@ -76,6 +80,11 @@
 import org.hornetq.core.remoting.impl.wireformat.ReattachSessionMessage;
 import org.hornetq.core.remoting.impl.wireformat.ReattachSessionResponseMessage;
 import org.hornetq.core.remoting.impl.wireformat.ReplicationAddMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationAddTXMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationCommitMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteTXMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationPrepareMessage;
 import org.hornetq.core.remoting.impl.wireformat.ReplicationResponseMessage;
 import org.hornetq.core.remoting.impl.wireformat.RollbackMessage;
 import org.hornetq.core.remoting.impl.wireformat.SessionAcknowledgeMessage;
@@ -366,6 +375,31 @@
             packet = new ReplicationAddMessage();
             break;
          }
+         case REPLICATION_APPEND_TX:
+         {
+            packet = new ReplicationAddTXMessage();
+            break;
+         }
+         case REPLICATION_DELETE:
+         {
+            packet = new ReplicationDeleteMessage();
+            break;
+         }
+         case REPLICATION_DELETE_TX:
+         {
+            packet = new ReplicationDeleteTXMessage();
+            break;
+         }
+         case REPLICATION_PREPARE:
+         {
+            packet = new ReplicationPrepareMessage();
+            break;
+         }
+         case REPLICATION_COMMIT_ROLLBACK:
+         {
+            packet = new ReplicationCommitMessage();
+            break;
+         }
          case REPLICATION_RESPONSE:
          {
             packet = new ReplicationResponseMessage();

Modified: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java	2009-10-01 22:37:37 UTC (rev 8029)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java	2009-10-02 03:39:15 UTC (rev 8030)
@@ -145,8 +145,18 @@
 
    public static final byte REPLICATION_APPEND = 81;
 
-   public static final byte REPLICATION_DELETE = 82;
+   public static final byte REPLICATION_APPEND_TX = 82;
 
+   public static final byte REPLICATION_DELETE = 83;
+
+   public static final byte REPLICATION_DELETE_TX = 84;
+   
+   public static final byte REPLICATION_PREPARE = 85;
+   
+   public static final byte REPLICATION_COMMIT_ROLLBACK = 86;
+   
+   
+
    // Static --------------------------------------------------------
 
    public PacketImpl(final byte type)

Modified: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddMessage.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddMessage.java	2009-10-01 22:37:37 UTC (rev 8029)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddMessage.java	2009-10-02 03:39:15 UTC (rev 8030)
@@ -115,6 +115,11 @@
    {
       return journalID;
    }
+   
+   public boolean isUpdate()
+   {
+      return isUpdate;
+   }
 
    /**
     * @return the recordType

Added: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddTXMessage.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddTXMessage.java	                        (rev 0)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddTXMessage.java	2009-10-02 03:39:15 UTC (rev 8030)
@@ -0,0 +1,159 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.remoting.impl.wireformat;
+
+import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.core.remoting.spi.HornetQBuffer;
+import org.hornetq.utils.DataConstants;
+
+/**
+ * A ReplicationAddMessage
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class ReplicationAddTXMessage extends PacketImpl
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private long txId;
+   
+   private long id;
+
+   /** 0 - Bindings, 1 - MessagesJournal */
+   private byte journalID;
+   
+   private boolean isUpdate;
+
+   private byte recordType;
+
+   private EncodingSupport encodingData;
+
+   private byte[] recordData;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public ReplicationAddTXMessage()
+   {
+      super(REPLICATION_APPEND_TX);
+   }
+
+   public ReplicationAddTXMessage(byte journalID, boolean isUpdate, long txId, long id, byte recordType, EncodingSupport encodingData)
+   {
+      this();
+      this.journalID = journalID;
+      this.isUpdate = isUpdate;
+      this.txId = txId;
+      this.id = id;
+      this.recordType = recordType;
+      this.encodingData = encodingData;
+   }
+
+   // Public --------------------------------------------------------
+
+   public int getRequiredBufferSize()
+   {
+      return BASIC_PACKET_SIZE + 
+             DataConstants.SIZE_BYTE +
+             DataConstants.SIZE_BOOLEAN +
+             DataConstants.SIZE_LONG +
+             DataConstants.SIZE_LONG +
+             DataConstants.SIZE_BYTE +
+             DataConstants.SIZE_INT +
+             (encodingData != null ? encodingData.getEncodeSize() : recordData.length);
+
+   }
+
+   @Override
+   public void encodeBody(final HornetQBuffer buffer)
+   {
+      buffer.writeByte(journalID);
+      buffer.writeBoolean(isUpdate);
+      buffer.writeLong(txId);
+      buffer.writeLong(id);
+      buffer.writeByte(recordType);
+      buffer.writeInt(encodingData.getEncodeSize());
+      encodingData.encode(buffer);
+   }
+
+   @Override
+   public void decodeBody(final HornetQBuffer buffer)
+   {
+      journalID = buffer.readByte();
+      isUpdate = buffer.readBoolean();
+      txId = buffer.readLong();
+      id = buffer.readLong();
+      recordType = buffer.readByte();
+      int size = buffer.readInt();
+      recordData = new byte[size];
+      buffer.readBytes(recordData);
+   }
+
+   /**
+    * @return the id
+    */
+   public long getId()
+   {
+      return id;
+   }
+   
+   public long getTxId()
+   {
+      return txId;
+   }
+
+   /**
+    * @return the journalID
+    */
+   public byte getJournalID()
+   {
+      return journalID;
+   }
+   
+   public boolean isUpdate()
+   {
+      return isUpdate;
+   }
+
+   /**
+    * @return the recordType
+    */
+   public byte getRecordType()
+   {
+      return recordType;
+   }
+
+   /**
+    * @return the recordData
+    */
+   public byte[] getRecordData()
+   {
+      return recordData;
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Added: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationCommitMessage.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationCommitMessage.java	                        (rev 0)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationCommitMessage.java	2009-10-02 03:39:15 UTC (rev 8030)
@@ -0,0 +1,106 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.remoting.impl.wireformat;
+
+import org.hornetq.core.remoting.spi.HornetQBuffer;
+import org.hornetq.utils.DataConstants;
+
+/**
+ * A ReplicationAddMessage
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class ReplicationCommitMessage extends PacketImpl
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   /** 0 - Bindings, 1 - MessagesJournal */
+   private byte journalID;
+
+   private boolean rollback;
+
+   private long txId;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public ReplicationCommitMessage()
+   {
+      super(REPLICATION_COMMIT_ROLLBACK);
+   }
+
+   public ReplicationCommitMessage(byte journalID, boolean rollback, long txId)
+   {
+      this();
+      this.journalID = journalID;
+      this.rollback = rollback;
+      this.txId = txId;
+   }
+
+   // Public --------------------------------------------------------
+
+   public int getRequiredBufferSize()
+   {
+      return BASIC_PACKET_SIZE + DataConstants.SIZE_BYTE + DataConstants.SIZE_BOOLEAN + DataConstants.SIZE_LONG;
+   }
+
+   @Override
+   public void encodeBody(final HornetQBuffer buffer)
+   {
+      buffer.writeByte(journalID);
+      buffer.writeBoolean(rollback);
+      buffer.writeLong(txId);
+   }
+
+   @Override
+   public void decodeBody(final HornetQBuffer buffer)
+   {
+      journalID = buffer.readByte();
+      rollback = buffer.readBoolean();
+      txId = buffer.readLong();
+   }
+
+   public boolean isRollback()
+   {
+      return rollback;
+   }
+
+   public long getTxId()
+   {
+      return txId;
+   }
+
+   /**
+    * @return the journalID
+    */
+   public byte getJournalID()
+   {
+      return journalID;
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Modified: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationDeleteMessage.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationDeleteMessage.java	2009-10-01 22:37:37 UTC (rev 8029)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationDeleteMessage.java	2009-10-02 03:39:15 UTC (rev 8030)
@@ -35,15 +35,7 @@
 
    /** 0 - Bindings, 1 - MessagesJournal */
    private byte journalID;
-   
-   private boolean isUpdate;
 
-   private byte recordType;
-
-   private EncodingSupport encodingData;
-
-   private byte[] recordData;
-
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
@@ -53,14 +45,11 @@
       super(REPLICATION_DELETE);
    }
 
-   public ReplicationDeleteMessage(byte journalID, boolean isUpdate, long id, byte recordType, EncodingSupport encodingData)
+   public ReplicationDeleteMessage(byte journalID, long id)
    {
       this();
       this.journalID = journalID;
-      this.isUpdate = isUpdate;
       this.id = id;
-      this.recordType = recordType;
-      this.encodingData = encodingData;
    }
 
    // Public --------------------------------------------------------
@@ -69,11 +58,7 @@
    {
       return BASIC_PACKET_SIZE + 
              DataConstants.SIZE_BYTE +
-             DataConstants.SIZE_BOOLEAN +
-             DataConstants.SIZE_LONG +
-             DataConstants.SIZE_BYTE +
-             DataConstants.SIZE_INT +
-             (encodingData != null ? encodingData.getEncodeSize() : recordData.length);
+             DataConstants.SIZE_LONG;
 
    }
 
@@ -81,23 +66,14 @@
    public void encodeBody(final HornetQBuffer buffer)
    {
       buffer.writeByte(journalID);
-      buffer.writeBoolean(isUpdate);
       buffer.writeLong(id);
-      buffer.writeByte(recordType);
-      buffer.writeInt(encodingData.getEncodeSize());
-      encodingData.encode(buffer);
    }
 
    @Override
    public void decodeBody(final HornetQBuffer buffer)
    {
       journalID = buffer.readByte();
-      isUpdate = buffer.readBoolean();
       id = buffer.readLong();
-      recordType = buffer.readByte();
-      int size = buffer.readInt();
-      recordData = new byte[size];
-      buffer.readBytes(recordData);
    }
 
    /**
@@ -116,22 +92,7 @@
       return journalID;
    }
 
-   /**
-    * @return the recordType
-    */
-   public byte getRecordType()
-   {
-      return recordType;
-   }
 
-   /**
-    * @return the recordData
-    */
-   public byte[] getRecordData()
-   {
-      return recordData;
-   }
-
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Added: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationDeleteTXMessage.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationDeleteTXMessage.java	                        (rev 0)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationDeleteTXMessage.java	2009-10-02 03:39:15 UTC (rev 8030)
@@ -0,0 +1,134 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.remoting.impl.wireformat;
+
+import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.core.remoting.spi.HornetQBuffer;
+import org.hornetq.utils.DataConstants;
+
+/**
+ * A ReplicationAddMessage
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class ReplicationDeleteTXMessage extends PacketImpl
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private long txId;
+   
+   private long id;
+
+   /** 0 - Bindings, 1 - MessagesJournal */
+   private byte journalID;
+
+   private EncodingSupport encodingData;
+
+   private byte[] recordData;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public ReplicationDeleteTXMessage()
+   {
+      super(REPLICATION_DELETE_TX);
+   }
+
+   public ReplicationDeleteTXMessage(byte journalID, long txId, long id, EncodingSupport encodingData)
+   {
+      this();
+      this.journalID = journalID;
+      this.txId = txId;
+      this.id = id;
+      this.encodingData = encodingData;
+   }
+
+   // Public --------------------------------------------------------
+
+   public int getRequiredBufferSize()
+   {
+      return BASIC_PACKET_SIZE + 
+             DataConstants.SIZE_BYTE +
+             DataConstants.SIZE_LONG +
+             DataConstants.SIZE_LONG +
+             DataConstants.SIZE_INT +
+             (encodingData != null ? encodingData.getEncodeSize() : recordData.length);
+
+   }
+
+   @Override
+   public void encodeBody(final HornetQBuffer buffer)
+   {
+      buffer.writeByte(journalID);
+      buffer.writeLong(txId);
+      buffer.writeLong(id);
+      buffer.writeInt(encodingData.getEncodeSize());
+      encodingData.encode(buffer);
+   }
+
+   @Override
+   public void decodeBody(final HornetQBuffer buffer)
+   {
+      journalID = buffer.readByte();
+      txId = buffer.readLong();
+      id = buffer.readLong();
+      int size = buffer.readInt();
+      recordData = new byte[size];
+      buffer.readBytes(recordData);
+   }
+
+   /**
+    * @return the id
+    */
+   public long getId()
+   {
+      return id;
+   }
+   
+   public long getTxId()
+   {
+      return txId;
+   }
+
+   /**
+    * @return the journalID
+    */
+   public byte getJournalID()
+   {
+      return journalID;
+   }
+   
+   /**
+    * @return the recordData
+    */
+   public byte[] getRecordData()
+   {
+      return recordData;
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Added: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationPrepareMessage.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationPrepareMessage.java	                        (rev 0)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationPrepareMessage.java	2009-10-02 03:39:15 UTC (rev 8030)
@@ -0,0 +1,120 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.remoting.impl.wireformat;
+
+import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.core.remoting.spi.HornetQBuffer;
+import org.hornetq.utils.DataConstants;
+
+/**
+ * A ReplicationAddMessage
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class ReplicationPrepareMessage extends PacketImpl
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private long txId;
+
+   /** 0 - Bindings, 1 - MessagesJournal */
+   private byte journalID;
+
+   private EncodingSupport encodingData;
+
+   private byte[] recordData;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public ReplicationPrepareMessage()
+   {
+      super(REPLICATION_DELETE_TX);
+   }
+
+   public ReplicationPrepareMessage(byte journalID, long txId, EncodingSupport encodingData)
+   {
+      this();
+      this.journalID = journalID;
+      this.txId = txId;
+      this.encodingData = encodingData;
+   }
+
+   // Public --------------------------------------------------------
+
+   public int getRequiredBufferSize()
+   {
+      return BASIC_PACKET_SIZE + 
+             DataConstants.SIZE_BYTE +
+             DataConstants.SIZE_LONG +
+             DataConstants.SIZE_INT +
+             (encodingData != null ? encodingData.getEncodeSize() : recordData.length);
+
+   }
+
+   @Override
+   public void encodeBody(final HornetQBuffer buffer)
+   {
+      buffer.writeByte(journalID);
+      buffer.writeLong(txId);
+      buffer.writeInt(encodingData.getEncodeSize());
+      encodingData.encode(buffer);
+   }
+
+   @Override
+   public void decodeBody(final HornetQBuffer buffer)
+   {
+      journalID = buffer.readByte();
+      txId = buffer.readLong();
+      int size = buffer.readInt();
+      recordData = new byte[size];
+      buffer.readBytes(recordData);
+   }
+
+   public long getTxId()
+   {
+      return txId;
+   }
+
+   /**
+    * @return the journalID
+    */
+   public byte getJournalID()
+   {
+      return journalID;
+   }
+   
+   /**
+    * @return the recordData
+    */
+   public byte[] getRecordData()
+   {
+      return recordData;
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Modified: branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java	2009-10-01 22:37:37 UTC (rev 8029)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java	2009-10-02 03:39:15 UTC (rev 8030)
@@ -25,9 +25,9 @@
 {
    void appendAddRecord(byte journalID, long id, byte recordType, EncodingSupport record) throws Exception;
 
-   void appendUpdateRecord(byte journalID, long id, byte recordType, EncodingSupport record, boolean sync) throws Exception;
+   void appendUpdateRecord(byte journalID, long id, byte recordType, EncodingSupport record) throws Exception;
 
-   void appendDeleteRecord(byte journalID, long id, boolean sync) throws Exception;
+   void appendDeleteRecord(byte journalID, long id) throws Exception;
 
    void appendAddRecordTransactional(byte journalID, long txID, long id, byte recordType, EncodingSupport record) throws Exception;
 
@@ -37,10 +37,10 @@
 
    void appendDeleteRecordTransactional(byte journalID, long txID, long id) throws Exception;
 
-   void appendCommitRecord(byte journalID, long txID, boolean sync) throws Exception;
+   void appendCommitRecord(byte journalID, long txID) throws Exception;
 
-   void appendPrepareRecord(byte journalID, long txID, EncodingSupport transactionData, boolean sync) throws Exception;
+   void appendPrepareRecord(byte journalID, long txID, EncodingSupport transactionData) throws Exception;
 
-   void appendRollbackRecord(byte journalID, long txID, boolean sync) throws Exception;
+   void appendRollbackRecord(byte journalID, long txID) throws Exception;
 
 }

Modified: branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java	2009-10-01 22:37:37 UTC (rev 8029)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java	2009-10-02 03:39:15 UTC (rev 8030)
@@ -19,9 +19,13 @@
 import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
 import org.hornetq.core.remoting.Channel;
 import org.hornetq.core.remoting.Packet;
-import org.hornetq.core.remoting.impl.wireformat.NullResponseMessage;
 import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
 import org.hornetq.core.remoting.impl.wireformat.ReplicationAddMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationAddTXMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationCommitMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteTXMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationPrepareMessage;
 import org.hornetq.core.remoting.impl.wireformat.ReplicationResponseMessage;
 import org.hornetq.core.replication.ReplicationEndpoint;
 import org.hornetq.core.server.HornetQServer;
@@ -49,7 +53,7 @@
    private Journal bindingsJournal;
 
    private Journal messagingJournal;
-   
+
    private JournalStorageManager storage;
 
    // Static --------------------------------------------------------
@@ -71,9 +75,28 @@
       {
          if (packet.getType() == PacketImpl.REPLICATION_APPEND)
          {
-            System.out.println("Replicated");
             handleAppendAddRecord(packet);
          }
+         else if (packet.getType() == PacketImpl.REPLICATION_APPEND_TX)
+         {
+            handleAppendAddTXRecord(packet);
+         }
+         else if (packet.getType() == PacketImpl.REPLICATION_DELETE)
+         {
+            handleAppendDelete(packet);
+         }
+         else if (packet.getType() == PacketImpl.REPLICATION_DELETE_TX)
+         {
+            handleAppendDeleteTX(packet);
+         }
+         else if (packet.getType() == PacketImpl.REPLICATION_PREPARE)
+         {
+            handlePrepare(packet);
+         }
+         else if (packet.getType() == PacketImpl.REPLICATION_COMMIT_ROLLBACK)
+         {
+            handleCommitRollback(packet);
+         }
       }
       catch (Exception e)
       {
@@ -97,14 +120,14 @@
    public void start() throws Exception
    {
       Configuration config = server.getConfiguration();
-      
+
       // TODO: this needs an executor
       JournalStorageManager storage = new JournalStorageManager(config, null);
       storage.start();
-      
+
       this.bindingsJournal = storage.getBindingsJournal();
       this.messagingJournal = storage.getBindingsJournal();
-      
+
       // We only need to load internal structures on the backup...
       storage.loadInternalOnly();
    }
@@ -138,26 +161,128 @@
    // Protected -----------------------------------------------------
 
    // Private -------------------------------------------------------
+   /**
+    * @param packet
+    */
+   private void handleCommitRollback(Packet packet) throws Exception
+   {
+      ReplicationCommitMessage commitMessage = (ReplicationCommitMessage)packet;
 
+      Journal journalToUse = getJournal(commitMessage.getJournalID());
+
+      
+      if (commitMessage.isRollback())
+      {
+         journalToUse.appendRollbackRecord(commitMessage.getTxId(), false);
+      }
+      else
+      {
+         journalToUse.appendCommitRecord(commitMessage.getTxId(), false);
+      }
+   }
+
    /**
     * @param packet
+    */
+   private void handlePrepare(Packet packet) throws Exception
+   {
+      ReplicationPrepareMessage prepareMessage = (ReplicationPrepareMessage)packet;
+
+      Journal journalToUse = getJournal(prepareMessage.getJournalID());
+
+      journalToUse.appendPrepareRecord(prepareMessage.getTxId(), prepareMessage.getRecordData(), false);
+   }
+
+   /**
+    * @param packet
+    */
+   private void handleAppendDeleteTX(Packet packet) throws Exception
+   {
+      ReplicationDeleteTXMessage deleteMessage = (ReplicationDeleteTXMessage)packet;
+
+      Journal journalToUse = getJournal(deleteMessage.getJournalID());
+
+      journalToUse.appendDeleteRecordTransactional(deleteMessage.getTxId(),
+                                                   deleteMessage.getId(),
+                                                   deleteMessage.getRecordData());
+   }
+
+   /**
+    * @param packet
+    */
+   private void handleAppendDelete(Packet packet) throws Exception
+   {
+      ReplicationDeleteMessage deleteMessage = (ReplicationDeleteMessage)packet;
+
+      Journal journalToUse = getJournal(deleteMessage.getJournalID());
+
+      journalToUse.appendDeleteRecord(deleteMessage.getId(), false);
+   }
+
+   /**
+    * @param packet
+    */
+   private void handleAppendAddTXRecord(Packet packet) throws Exception
+   {
+      ReplicationAddTXMessage addMessage = (ReplicationAddTXMessage)packet;
+
+      Journal journalToUse = getJournal(addMessage.getJournalID());
+
+      if (addMessage.isUpdate())
+      {
+         journalToUse.appendUpdateRecordTransactional(addMessage.getTxId(),
+                                                      addMessage.getId(),
+                                                      addMessage.getRecordType(),
+                                                      addMessage.getRecordData());
+      }
+      else
+      {
+         journalToUse.appendAddRecordTransactional(addMessage.getTxId(),
+                                                   addMessage.getId(),
+                                                   addMessage.getRecordType(),
+                                                   addMessage.getRecordData());
+      }
+   }
+
+   /**
+    * @param packet
     * @throws Exception
     */
    private void handleAppendAddRecord(Packet packet) throws Exception
    {
       ReplicationAddMessage addMessage = (ReplicationAddMessage)packet;
-      Journal journalToUse;
 
-      if (addMessage.getJournalID() == (byte)0)
+      Journal journalToUse = getJournal(addMessage.getJournalID());
+
+      if (addMessage.isUpdate())
       {
+         journalToUse.appendUpdateRecord(addMessage.getId(),
+                                         addMessage.getRecordType(),
+                                         addMessage.getRecordData(),
+                                         false);
+      }
+      else
+      {
+         journalToUse.appendAddRecord(addMessage.getId(), addMessage.getRecordType(), addMessage.getRecordData(), false);
+      }
+   }
+
+   /**
+    * @param journalID
+    * @return
+    */
+   private Journal getJournal(byte journalID)
+   {
+      Journal journalToUse;
+      if (journalID == (byte)0)
+      {
          journalToUse = bindingsJournal;
       }
       else
       {
          journalToUse = messagingJournal;
       }
-
-      journalToUse.appendAddRecord(addMessage.getId(), addMessage.getRecordType(), addMessage.getRecordData(), false);
+      return journalToUse;
    }
 
    // Inner classes -------------------------------------------------

Modified: branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2009-10-01 22:37:37 UTC (rev 8029)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2009-10-02 03:39:15 UTC (rev 8030)
@@ -19,6 +19,7 @@
 
 import org.hornetq.core.client.impl.ConnectionManager;
 import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.core.journal.impl.JournalImpl;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.remoting.Channel;
 import org.hornetq.core.remoting.ChannelHandler;
@@ -27,6 +28,12 @@
 import org.hornetq.core.remoting.impl.wireformat.CreateReplicationSessionMessage;
 import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
 import org.hornetq.core.remoting.impl.wireformat.ReplicationAddMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationAddTXMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationCommitMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteTXMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationPrepareMessage;
+import org.hornetq.core.remoting.spi.HornetQBuffer;
 import org.hornetq.core.replication.ReplicationManager;
 import org.hornetq.core.replication.ReplicationToken;
 
@@ -46,7 +53,7 @@
    // Attributes ----------------------------------------------------
 
    // TODO: where should this be configured?
-   private static final int WINDOW_SIZE = 100 * 1024;
+   private static final int WINDOW_SIZE = 1024 * 1024;
 
    private final ResponseHandler responseHandler = new ResponseHandler();
 
@@ -88,43 +95,52 @@
     * @see org.hornetq.core.replication.ReplicationManager#replicate(byte[], org.hornetq.core.replication.ReplicationToken)
     */
 
-   public void appendAddRecord(final byte journalID, final long id, final byte recordType, final EncodingSupport encodingData)
+   public void appendAddRecord(final byte journalID, final long id, final byte recordType, final EncodingSupport record)
    {
-      sendReplicatePacket(new ReplicationAddMessage(journalID, false, id, recordType, encodingData));
+      sendReplicatePacket(new ReplicationAddMessage(journalID, false, id, recordType, record));
    }
-   
-   
+
    /* (non-Javadoc)
     * @see org.hornetq.core.replication.ReplicationManager#appendUpdateRecord(byte, long, byte, org.hornetq.core.journal.EncodingSupport, boolean)
     */
-   public void appendUpdateRecord(byte journalID, long id, byte recordType, EncodingSupport encodingData, boolean sync) throws Exception
+   public void appendUpdateRecord(byte journalID, long id, byte recordType, EncodingSupport record) throws Exception
    {
-      sendReplicatePacket(new ReplicationAddMessage(journalID, true, id, recordType, encodingData));
+      sendReplicatePacket(new ReplicationAddMessage(journalID, true, id, recordType, record));
    }
 
    /* (non-Javadoc)
     * @see org.hornetq.core.replication.ReplicationManager#appendDeleteRecord(byte, long, boolean)
     */
-   public void appendDeleteRecord(byte journalID, long id, boolean sync) throws Exception
+   public void appendDeleteRecord(byte journalID, long id) throws Exception
    {
-      // TODO Auto-generated method stub
-      
+      sendReplicatePacket(new ReplicationDeleteMessage(journalID, id));
    }
 
+   public void appendAddRecordTransactional(byte journalID, long txID, long id, byte recordType, EncodingSupport record) throws Exception
+   {
+      sendReplicatePacket(new ReplicationAddTXMessage(journalID, false, txID, id, recordType, record));
+   }
 
    
-   public void appendAddRecordTransactional(byte journalID, long txID, long id, byte recordType, EncodingSupport record) throws Exception
+   /* (non-Javadoc)
+    * @see org.hornetq.core.replication.ReplicationManager#appendUpdateRecordTransactional(byte, long, long, byte, org.hornetq.core.journal.EncodingSupport)
+    */
+   public void appendUpdateRecordTransactional(byte journalID,
+                                               long txID,
+                                               long id,
+                                               byte recordType,
+                                               EncodingSupport record) throws Exception
    {
-      
+      sendReplicatePacket(new ReplicationAddTXMessage(journalID, true, txID, id, recordType, record));
    }
 
+   
    /* (non-Javadoc)
     * @see org.hornetq.core.replication.ReplicationManager#appendCommitRecord(byte, long, boolean)
     */
-   public void appendCommitRecord(byte journalID, long txID, boolean sync) throws Exception
+   public void appendCommitRecord(byte journalID, long txID) throws Exception
    {
-      // TODO Auto-generated method stub
-      
+      sendReplicatePacket(new ReplicationCommitMessage(journalID, false, txID));
    }
 
    /* (non-Javadoc)
@@ -132,8 +148,7 @@
     */
    public void appendDeleteRecordTransactional(byte journalID, long txID, long id, EncodingSupport record) throws Exception
    {
-      // TODO Auto-generated method stub
-      
+      sendReplicatePacket(new ReplicationDeleteTXMessage(journalID, txID, id, record));
    }
 
    /* (non-Javadoc)
@@ -141,46 +156,26 @@
     */
    public void appendDeleteRecordTransactional(byte journalID, long txID, long id) throws Exception
    {
-      // TODO Auto-generated method stub
-      
+      sendReplicatePacket(new ReplicationDeleteTXMessage(journalID, txID, id, NullEncoding.instance));
    }
 
    /* (non-Javadoc)
     * @see org.hornetq.core.replication.ReplicationManager#appendPrepareRecord(byte, long, org.hornetq.core.journal.EncodingSupport, boolean)
     */
-   public void appendPrepareRecord(byte journalID, long txID, EncodingSupport transactionData, boolean sync) throws Exception
+   public void appendPrepareRecord(byte journalID, long txID, EncodingSupport transactionData) throws Exception
    {
-      // TODO Auto-generated method stub
-      
+      sendReplicatePacket(new ReplicationPrepareMessage(journalID, txID, transactionData));
    }
 
    /* (non-Javadoc)
     * @see org.hornetq.core.replication.ReplicationManager#appendRollbackRecord(byte, long, boolean)
     */
-   public void appendRollbackRecord(byte journalID, long txID, boolean sync) throws Exception
+   public void appendRollbackRecord(byte journalID, long txID) throws Exception
    {
-      // TODO Auto-generated method stub
-      
+      sendReplicatePacket(new ReplicationCommitMessage(journalID, false, txID));
    }
 
    /* (non-Javadoc)
-    * @see org.hornetq.core.replication.ReplicationManager#appendUpdateRecordTransactional(byte, long, long, byte, org.hornetq.core.journal.EncodingSupport)
-    */
-   public void appendUpdateRecordTransactional(byte journalID,
-                                               long txID,
-                                               long id,
-                                               byte recordType,
-                                               EncodingSupport record) throws Exception
-   {
-      // TODO Auto-generated method stub
-      
-   }
-
-   
-   
-   
-
-   /* (non-Javadoc)
     * @see org.hornetq.core.server.HornetQComponent#isStarted()
     */
    public synchronized boolean isStarted()
@@ -202,7 +197,7 @@
       this.replicatingChannel = connection.getChannel(channelID, WINDOW_SIZE, false);
 
       this.replicatingChannel.setHandler(this.responseHandler);
-      
+
       CreateReplicationSessionMessage replicationStartPackage = new CreateReplicationSessionMessage(channelID,
                                                                                                     WINDOW_SIZE);
 
@@ -312,4 +307,24 @@
 
    }
 
+   private static class NullEncoding implements EncodingSupport
+   {
+
+      static NullEncoding instance = new NullEncoding();
+      
+      public void decode(final HornetQBuffer buffer)
+      {
+      }
+
+      public void encode(final HornetQBuffer buffer)
+      {
+      }
+
+      public int getEncodeSize()
+      {
+         return 0;
+      }
+
+   }
+
 }

Modified: branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java	2009-10-01 22:37:37 UTC (rev 8029)
+++ branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java	2009-10-02 03:39:15 UTC (rev 8030)
@@ -146,10 +146,18 @@
       {
          ReplicationManagerImpl manager = new ReplicationManagerImpl(connectionManager, executor);
          manager.start();
-         for (int i = 0; i < 100; i++)
-         {
-            manager.appendAddRecord((byte)0, i, (byte)1, new DataImplement());
-         }
+         
+         manager.appendAddRecord((byte)0, 1, (byte)1, new FakeData());
+         manager.appendUpdateRecord((byte)0, 1, (byte)2, new FakeData());
+         manager.appendDeleteRecord((byte)0, 1);
+         manager.appendAddRecordTransactional((byte)0, 2, 2, (byte)1, new FakeData());
+         manager.appendUpdateRecordTransactional((byte)0, 2, 2, (byte)2, new FakeData());
+         manager.appendCommitRecord((byte)0, 2);
+         
+         manager.appendDeleteRecordTransactional((byte)0, 3, 4,new FakeData());
+         manager.appendPrepareRecord((byte)0, 3, new FakeData());
+         manager.appendRollbackRecord((byte)0, 3);
+
          final CountDownLatch latch = new CountDownLatch(1);
          manager.getReplicationToken().addFutureCompletion(new Runnable()
          {
@@ -169,7 +177,7 @@
       }
    }
 
-   class DataImplement implements EncodingSupport
+   class FakeData implements EncodingSupport
    {
 
       public void decode(HornetQBuffer buffer)



More information about the hornetq-commits mailing list