[hornetq-commits] JBoss hornetq SVN: r8011 - in branches/Replication_Clebert: src/main/org/hornetq/core/remoting/impl/wireformat and 3 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Sep 30 00:04:32 EDT 2009


Author: clebert.suconic at jboss.com
Date: 2009-09-30 00:04:31 -0400 (Wed, 30 Sep 2009)
New Revision: 8011

Added:
   branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddMessage.java
Modified:
   branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java
   branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java
   branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java
   branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
   branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
Log:
changes...

Modified: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java	2009-09-29 21:16:46 UTC (rev 8010)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java	2009-09-30 04:04:31 UTC (rev 8011)
@@ -13,7 +13,7 @@
 
 package org.hornetq.core.remoting.impl;
 
-
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_APPEND;
 import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.CREATE_REPLICATION;
 import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.CREATESESSION;
 import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.CREATESESSION_RESP;
@@ -74,6 +74,7 @@
 import org.hornetq.core.remoting.impl.wireformat.Ping;
 import org.hornetq.core.remoting.impl.wireformat.ReattachSessionMessage;
 import org.hornetq.core.remoting.impl.wireformat.ReattachSessionResponseMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationAddMessage;
 import org.hornetq.core.remoting.impl.wireformat.RollbackMessage;
 import org.hornetq.core.remoting.impl.wireformat.SessionAcknowledgeMessage;
 import org.hornetq.core.remoting.impl.wireformat.SessionBindingQueryMessage;
@@ -353,12 +354,16 @@
             packet = new SessionSendContinuationMessage();
             break;
          }
-
          case CREATE_REPLICATION:
          {
             packet = new CreateReplicationSessionMessage();
             break;
          }
+         case REPLICATION_APPEND:
+         {
+            packet = new ReplicationAddMessage();
+            break;
+         }
          default:
          {
             throw new IllegalArgumentException("Invalid type: " + packetType);

Modified: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java	2009-09-29 21:16:46 UTC (rev 8010)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java	2009-09-30 04:04:31 UTC (rev 8011)
@@ -141,7 +141,7 @@
    
    // Replication
    
-   public static final byte REPLICATION_SEND_REPLICATION = 77;
+   public static final byte REPLICATION_APPEND = 77;
 
    // Static --------------------------------------------------------
 

Added: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddMessage.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddMessage.java	                        (rev 0)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddMessage.java	2009-09-30 04:04:31 UTC (rev 8011)
@@ -0,0 +1,101 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.remoting.impl.wireformat;
+
+import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.core.remoting.spi.HornetQBuffer;
+import org.hornetq.utils.DataConstants;
+
+/**
+ * A ReplicationAddMessage
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class ReplicationAddMessage extends PacketImpl
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private long id;
+
+   private byte recordType;
+
+   private EncodingSupport encodingData;
+
+   private byte[] recordData;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+   
+   public ReplicationAddMessage()
+   {
+      super(REPLICATION_APPEND);
+   }
+
+   public ReplicationAddMessage(long id, byte recordType, EncodingSupport encodingData)
+   {
+      this();
+      this.id = id;
+      this.recordType = recordType;
+      this.encodingData = encodingData;
+   }
+
+   // Public --------------------------------------------------------
+   
+   
+   
+   public int getRequiredBufferSize()
+   {
+      return BASIC_PACKET_SIZE +
+             DataConstants.SIZE_LONG +
+             DataConstants.SIZE_BYTE +
+             DataConstants.SIZE_INT +
+             (encodingData != null ? encodingData.getEncodeSize() : recordData.length);
+
+   }
+
+   @Override
+   public void encodeBody(final HornetQBuffer buffer)
+   {
+      buffer.writeLong(id);
+      buffer.writeByte(recordType);
+      buffer.writeInt(encodingData.getEncodeSize());
+      encodingData.encode(buffer);
+   }
+
+   @Override
+   public void decodeBody(final HornetQBuffer buffer)
+   {
+      id = buffer.readLong();
+      recordType = buffer.readByte();
+      int size = buffer.readInt();
+      recordData = new byte[size];
+      buffer.readBytes(recordData);
+   }
+
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Modified: branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java	2009-09-29 21:16:46 UTC (rev 8010)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java	2009-09-30 04:04:31 UTC (rev 8011)
@@ -13,6 +13,7 @@
 
 package org.hornetq.core.replication;
 
+import org.hornetq.core.journal.EncodingSupport;
 import org.hornetq.core.server.HornetQComponent;
 
 
@@ -23,5 +24,5 @@
  */
 public interface ReplicationManager  extends HornetQComponent
 {
-   void replicate(byte[] bytes, ReplicationToken token);
+   void appendAddRecord(long id, byte recordType, EncodingSupport record);
 }

Modified: branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2009-09-29 21:16:46 UTC (rev 8010)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2009-09-30 04:04:31 UTC (rev 8011)
@@ -14,9 +14,11 @@
 package org.hornetq.core.replication.impl;
 
 import org.hornetq.core.client.impl.ConnectionManager;
+import org.hornetq.core.journal.EncodingSupport;
 import org.hornetq.core.remoting.Channel;
 import org.hornetq.core.remoting.RemotingConnection;
 import org.hornetq.core.remoting.impl.wireformat.CreateReplicationSessionMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationAddMessage;
 import org.hornetq.core.replication.ReplicationManager;
 import org.hornetq.core.replication.ReplicationToken;
 
@@ -63,9 +65,11 @@
    /* (non-Javadoc)
     * @see org.hornetq.core.replication.ReplicationManager#replicate(byte[], org.hornetq.core.replication.ReplicationToken)
     */
-   public void replicate(byte[] bytes, ReplicationToken token)
+   
+   
+   public void appendAddRecord(long id, byte recordType, EncodingSupport encodingData)
    {
-      replicatingChannel.send(new CreateReplicationSessionMessage(1, 1));
+      replicatingChannel.send(new ReplicationAddMessage(id, recordType, encodingData));
    }
 
    /* (non-Javadoc)

Modified: branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java	2009-09-29 21:16:46 UTC (rev 8010)
+++ branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java	2009-09-30 04:04:31 UTC (rev 8011)
@@ -34,6 +34,7 @@
 import org.hornetq.core.config.Configuration;
 import org.hornetq.core.config.TransportConfiguration;
 import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.journal.EncodingSupport;
 import org.hornetq.core.management.ManagementService;
 import org.hornetq.core.management.impl.HornetQServerControlImpl;
 import org.hornetq.core.persistence.StorageManager;
@@ -47,6 +48,7 @@
 import org.hornetq.core.remoting.impl.wireformat.ReattachSessionResponseMessage;
 import org.hornetq.core.remoting.server.RemotingService;
 import org.hornetq.core.remoting.server.impl.RemotingServiceImpl;
+import org.hornetq.core.remoting.spi.HornetQBuffer;
 import org.hornetq.core.replication.ReplicationEndpoint;
 import org.hornetq.core.replication.impl.ReplicationEndpointImpl;
 import org.hornetq.core.replication.impl.ReplicationManagerImpl;
@@ -165,7 +167,8 @@
       {
          ReplicationManagerImpl manager = new ReplicationManagerImpl(connectionManager);
          manager.start();
-         manager.replicate(new byte[]{3}, null);
+         manager.appendAddRecord(1, (byte)1, new DataImplement());
+         Thread.sleep(1000);
          manager.stop();
       }
       finally
@@ -173,7 +176,29 @@
          server.stop();
       }
    }
+   
+   class DataImplement implements EncodingSupport
+   {
 
+      public void decode(HornetQBuffer buffer)
+      {
+      }
+
+      public void encode(HornetQBuffer buffer)
+      {
+         buffer.writeBytes(new byte[5]);
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.journal.EncodingSupport#getEncodeSize()
+       */
+      public int getEncodeSize()
+      {
+         return 5;
+      }
+      
+   }
+
    // Package protected ---------------------------------------------
    class LocalRemotingServiceImpl extends RemotingServiceImpl
    {



More information about the hornetq-commits mailing list