Author: clebert.suconic(a)jboss.com
Date: 2009-09-30 00:04:31 -0400 (Wed, 30 Sep 2009)
New Revision: 8011
Added:
branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddMessage.java
Modified:
branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java
branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java
branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java
branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
Log:
changes...
Modified:
branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java
===================================================================
---
branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java 2009-09-29
21:16:46 UTC (rev 8010)
+++
branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java 2009-09-30
04:04:31 UTC (rev 8011)
@@ -13,7 +13,7 @@
package org.hornetq.core.remoting.impl;
-
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_APPEND;
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.CREATE_REPLICATION;
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.CREATESESSION;
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.CREATESESSION_RESP;
@@ -74,6 +74,7 @@
import org.hornetq.core.remoting.impl.wireformat.Ping;
import org.hornetq.core.remoting.impl.wireformat.ReattachSessionMessage;
import org.hornetq.core.remoting.impl.wireformat.ReattachSessionResponseMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationAddMessage;
import org.hornetq.core.remoting.impl.wireformat.RollbackMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionAcknowledgeMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionBindingQueryMessage;
@@ -353,12 +354,16 @@
packet = new SessionSendContinuationMessage();
break;
}
-
case CREATE_REPLICATION:
{
packet = new CreateReplicationSessionMessage();
break;
}
+ case REPLICATION_APPEND:
+ {
+ packet = new ReplicationAddMessage();
+ break;
+ }
default:
{
throw new IllegalArgumentException("Invalid type: " + packetType);
Modified:
branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
---
branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java 2009-09-29
21:16:46 UTC (rev 8010)
+++
branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java 2009-09-30
04:04:31 UTC (rev 8011)
@@ -141,7 +141,7 @@
// Replication
- public static final byte REPLICATION_SEND_REPLICATION = 77;
+ public static final byte REPLICATION_APPEND = 77;
// Static --------------------------------------------------------
Added:
branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddMessage.java
===================================================================
---
branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddMessage.java
(rev 0)
+++
branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddMessage.java 2009-09-30
04:04:31 UTC (rev 8011)
@@ -0,0 +1,101 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.remoting.impl.wireformat;
+
+import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.core.remoting.spi.HornetQBuffer;
+import org.hornetq.utils.DataConstants;
+
+/**
+ * A ReplicationAddMessage
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class ReplicationAddMessage extends PacketImpl
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private long id;
+
+ private byte recordType;
+
+ private EncodingSupport encodingData;
+
+ private byte[] recordData;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public ReplicationAddMessage()
+ {
+ super(REPLICATION_APPEND);
+ }
+
+ public ReplicationAddMessage(long id, byte recordType, EncodingSupport encodingData)
+ {
+ this();
+ this.id = id;
+ this.recordType = recordType;
+ this.encodingData = encodingData;
+ }
+
+ // Public --------------------------------------------------------
+
+
+
+ public int getRequiredBufferSize()
+ {
+ return BASIC_PACKET_SIZE +
+ DataConstants.SIZE_LONG +
+ DataConstants.SIZE_BYTE +
+ DataConstants.SIZE_INT +
+ (encodingData != null ? encodingData.getEncodeSize() : recordData.length);
+
+ }
+
+ @Override
+ public void encodeBody(final HornetQBuffer buffer)
+ {
+ buffer.writeLong(id);
+ buffer.writeByte(recordType);
+ buffer.writeInt(encodingData.getEncodeSize());
+ encodingData.encode(buffer);
+ }
+
+ @Override
+ public void decodeBody(final HornetQBuffer buffer)
+ {
+ id = buffer.readLong();
+ recordType = buffer.readByte();
+ int size = buffer.readInt();
+ recordData = new byte[size];
+ buffer.readBytes(recordData);
+ }
+
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified:
branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java
===================================================================
---
branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java 2009-09-29
21:16:46 UTC (rev 8010)
+++
branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java 2009-09-30
04:04:31 UTC (rev 8011)
@@ -13,6 +13,7 @@
package org.hornetq.core.replication;
+import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.server.HornetQComponent;
@@ -23,5 +24,5 @@
*/
public interface ReplicationManager extends HornetQComponent
{
- void replicate(byte[] bytes, ReplicationToken token);
+ void appendAddRecord(long id, byte recordType, EncodingSupport record);
}
Modified:
branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
---
branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-09-29
21:16:46 UTC (rev 8010)
+++
branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-09-30
04:04:31 UTC (rev 8011)
@@ -14,9 +14,11 @@
package org.hornetq.core.replication.impl;
import org.hornetq.core.client.impl.ConnectionManager;
+import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.remoting.Channel;
import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.core.remoting.impl.wireformat.CreateReplicationSessionMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationAddMessage;
import org.hornetq.core.replication.ReplicationManager;
import org.hornetq.core.replication.ReplicationToken;
@@ -63,9 +65,11 @@
/* (non-Javadoc)
* @see org.hornetq.core.replication.ReplicationManager#replicate(byte[],
org.hornetq.core.replication.ReplicationToken)
*/
- public void replicate(byte[] bytes, ReplicationToken token)
+
+
+ public void appendAddRecord(long id, byte recordType, EncodingSupport encodingData)
{
- replicatingChannel.send(new CreateReplicationSessionMessage(1, 1));
+ replicatingChannel.send(new ReplicationAddMessage(id, recordType, encodingData));
}
/* (non-Javadoc)
Modified:
branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
---
branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-09-29
21:16:46 UTC (rev 8010)
+++
branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-09-30
04:04:31 UTC (rev 8011)
@@ -34,6 +34,7 @@
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.TransportConfiguration;
import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.management.ManagementService;
import org.hornetq.core.management.impl.HornetQServerControlImpl;
import org.hornetq.core.persistence.StorageManager;
@@ -47,6 +48,7 @@
import org.hornetq.core.remoting.impl.wireformat.ReattachSessionResponseMessage;
import org.hornetq.core.remoting.server.RemotingService;
import org.hornetq.core.remoting.server.impl.RemotingServiceImpl;
+import org.hornetq.core.remoting.spi.HornetQBuffer;
import org.hornetq.core.replication.ReplicationEndpoint;
import org.hornetq.core.replication.impl.ReplicationEndpointImpl;
import org.hornetq.core.replication.impl.ReplicationManagerImpl;
@@ -165,7 +167,8 @@
{
ReplicationManagerImpl manager = new ReplicationManagerImpl(connectionManager);
manager.start();
- manager.replicate(new byte[]{3}, null);
+ manager.appendAddRecord(1, (byte)1, new DataImplement());
+ Thread.sleep(1000);
manager.stop();
}
finally
@@ -173,7 +176,29 @@
server.stop();
}
}
+
+ class DataImplement implements EncodingSupport
+ {
+ public void decode(HornetQBuffer buffer)
+ {
+ }
+
+ public void encode(HornetQBuffer buffer)
+ {
+ buffer.writeBytes(new byte[5]);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.EncodingSupport#getEncodeSize()
+ */
+ public int getEncodeSize()
+ {
+ return 5;
+ }
+
+ }
+
// Package protected ---------------------------------------------
class LocalRemotingServiceImpl extends RemotingServiceImpl
{