[hornetq-commits] JBoss hornetq SVN: r8030 - in branches/Replication_Clebert: src/main/org/hornetq/core/journal/impl and 5 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Thu Oct 1 23:39:16 EDT 2009
Author: clebert.suconic at jboss.com
Date: 2009-10-01 23:39:15 -0400 (Thu, 01 Oct 2009)
New Revision: 8030
Added:
branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddTXMessage.java
branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationCommitMessage.java
branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationDeleteTXMessage.java
branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationPrepareMessage.java
Modified:
branches/Replication_Clebert/src/main/org/hornetq/core/journal/Journal.java
branches/Replication_Clebert/src/main/org/hornetq/core/journal/impl/JournalImpl.java
branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java
branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java
branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddMessage.java
branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationDeleteMessage.java
branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java
branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
Log:
Changes
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/journal/Journal.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/journal/Journal.java 2009-10-01 22:37:37 UTC (rev 8029)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/journal/Journal.java 2009-10-02 03:39:15 UTC (rev 8030)
@@ -71,6 +71,8 @@
*/
void appendPrepareRecord(long txID, EncodingSupport transactionData, boolean sync) throws Exception;
+ void appendPrepareRecord(long txID, byte[] transactionData, boolean sync) throws Exception;
+
void appendRollbackRecord(long txID, boolean sync) throws Exception;
// Load
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-10-01 22:37:37 UTC (rev 8029)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-10-02 03:39:15 UTC (rev 8030)
@@ -1163,7 +1163,17 @@
{
appendDeleteRecordTransactional(txID, id, NullEncoding.instance);
}
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#appendPrepareRecord(long, byte[], boolean)
+ */
+ public void appendPrepareRecord(long txID, byte[] transactionData, boolean sync) throws Exception
+ {
+ appendPrepareRecord(txID, new ByteArrayEncoding(transactionData), sync);
+ }
+
+
/**
*
* <p>If the system crashed after a prepare was called, it should store information that is required to bring the transaction
@@ -3342,7 +3352,12 @@
private static class NullEncoding implements EncodingSupport
{
- static NullEncoding instance = new NullEncoding();
+ private static NullEncoding instance = new NullEncoding();
+
+ public static NullEncoding getInstance()
+ {
+ return instance;
+ }
public void decode(final HornetQBuffer buffer)
{
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java 2009-10-01 22:37:37 UTC (rev 8029)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java 2009-10-02 03:39:15 UTC (rev 8030)
@@ -12,7 +12,11 @@
*/
package org.hornetq.core.remoting.impl;
-
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_PREPARE;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_DELETE_TX;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_COMMIT_ROLLBACK;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_APPEND_TX;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_DELETE;
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_RESPONSE;
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_APPEND;
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.CREATE_REPLICATION;
@@ -76,6 +80,11 @@
import org.hornetq.core.remoting.impl.wireformat.ReattachSessionMessage;
import org.hornetq.core.remoting.impl.wireformat.ReattachSessionResponseMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationAddMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationAddTXMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationCommitMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteTXMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationPrepareMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationResponseMessage;
import org.hornetq.core.remoting.impl.wireformat.RollbackMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionAcknowledgeMessage;
@@ -366,6 +375,31 @@
packet = new ReplicationAddMessage();
break;
}
+ case REPLICATION_APPEND_TX:
+ {
+ packet = new ReplicationAddTXMessage();
+ break;
+ }
+ case REPLICATION_DELETE:
+ {
+ packet = new ReplicationDeleteMessage();
+ break;
+ }
+ case REPLICATION_DELETE_TX:
+ {
+ packet = new ReplicationDeleteTXMessage();
+ break;
+ }
+ case REPLICATION_PREPARE:
+ {
+ packet = new ReplicationPrepareMessage();
+ break;
+ }
+ case REPLICATION_COMMIT_ROLLBACK:
+ {
+ packet = new ReplicationCommitMessage();
+ break;
+ }
case REPLICATION_RESPONSE:
{
packet = new ReplicationResponseMessage();
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java 2009-10-01 22:37:37 UTC (rev 8029)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java 2009-10-02 03:39:15 UTC (rev 8030)
@@ -145,8 +145,18 @@
public static final byte REPLICATION_APPEND = 81;
- public static final byte REPLICATION_DELETE = 82;
+ public static final byte REPLICATION_APPEND_TX = 82;
+ public static final byte REPLICATION_DELETE = 83;
+
+ public static final byte REPLICATION_DELETE_TX = 84;
+
+ public static final byte REPLICATION_PREPARE = 85;
+
+ public static final byte REPLICATION_COMMIT_ROLLBACK = 86;
+
+
+
// Static --------------------------------------------------------
public PacketImpl(final byte type)
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddMessage.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddMessage.java 2009-10-01 22:37:37 UTC (rev 8029)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddMessage.java 2009-10-02 03:39:15 UTC (rev 8030)
@@ -115,6 +115,11 @@
{
return journalID;
}
+
+ public boolean isUpdate()
+ {
+ return isUpdate;
+ }
/**
* @return the recordType
Added: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddTXMessage.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddTXMessage.java (rev 0)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddTXMessage.java 2009-10-02 03:39:15 UTC (rev 8030)
@@ -0,0 +1,159 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.remoting.impl.wireformat;
+
+import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.core.remoting.spi.HornetQBuffer;
+import org.hornetq.utils.DataConstants;
+
+/**
+ * A ReplicationAddMessage
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class ReplicationAddTXMessage extends PacketImpl
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private long txId;
+
+ private long id;
+
+ /** 0 - Bindings, 1 - MessagesJournal */
+ private byte journalID;
+
+ private boolean isUpdate;
+
+ private byte recordType;
+
+ private EncodingSupport encodingData;
+
+ private byte[] recordData;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public ReplicationAddTXMessage()
+ {
+ super(REPLICATION_APPEND_TX);
+ }
+
+ public ReplicationAddTXMessage(byte journalID, boolean isUpdate, long txId, long id, byte recordType, EncodingSupport encodingData)
+ {
+ this();
+ this.journalID = journalID;
+ this.isUpdate = isUpdate;
+ this.txId = txId;
+ this.id = id;
+ this.recordType = recordType;
+ this.encodingData = encodingData;
+ }
+
+ // Public --------------------------------------------------------
+
+ public int getRequiredBufferSize()
+ {
+ return BASIC_PACKET_SIZE +
+ DataConstants.SIZE_BYTE +
+ DataConstants.SIZE_BOOLEAN +
+ DataConstants.SIZE_LONG +
+ DataConstants.SIZE_LONG +
+ DataConstants.SIZE_BYTE +
+ DataConstants.SIZE_INT +
+ (encodingData != null ? encodingData.getEncodeSize() : recordData.length);
+
+ }
+
+ @Override
+ public void encodeBody(final HornetQBuffer buffer)
+ {
+ buffer.writeByte(journalID);
+ buffer.writeBoolean(isUpdate);
+ buffer.writeLong(txId);
+ buffer.writeLong(id);
+ buffer.writeByte(recordType);
+ buffer.writeInt(encodingData.getEncodeSize());
+ encodingData.encode(buffer);
+ }
+
+ @Override
+ public void decodeBody(final HornetQBuffer buffer)
+ {
+ journalID = buffer.readByte();
+ isUpdate = buffer.readBoolean();
+ txId = buffer.readLong();
+ id = buffer.readLong();
+ recordType = buffer.readByte();
+ int size = buffer.readInt();
+ recordData = new byte[size];
+ buffer.readBytes(recordData);
+ }
+
+ /**
+ * @return the id
+ */
+ public long getId()
+ {
+ return id;
+ }
+
+ public long getTxId()
+ {
+ return txId;
+ }
+
+ /**
+ * @return the journalID
+ */
+ public byte getJournalID()
+ {
+ return journalID;
+ }
+
+ public boolean isUpdate()
+ {
+ return isUpdate;
+ }
+
+ /**
+ * @return the recordType
+ */
+ public byte getRecordType()
+ {
+ return recordType;
+ }
+
+ /**
+ * @return the recordData
+ */
+ public byte[] getRecordData()
+ {
+ return recordData;
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Added: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationCommitMessage.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationCommitMessage.java (rev 0)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationCommitMessage.java 2009-10-02 03:39:15 UTC (rev 8030)
@@ -0,0 +1,106 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.remoting.impl.wireformat;
+
+import org.hornetq.core.remoting.spi.HornetQBuffer;
+import org.hornetq.utils.DataConstants;
+
+/**
+ * A ReplicationAddMessage
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class ReplicationCommitMessage extends PacketImpl
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ /** 0 - Bindings, 1 - MessagesJournal */
+ private byte journalID;
+
+ private boolean rollback;
+
+ private long txId;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public ReplicationCommitMessage()
+ {
+ super(REPLICATION_COMMIT_ROLLBACK);
+ }
+
+ public ReplicationCommitMessage(byte journalID, boolean rollback, long txId)
+ {
+ this();
+ this.journalID = journalID;
+ this.rollback = rollback;
+ this.txId = txId;
+ }
+
+ // Public --------------------------------------------------------
+
+ public int getRequiredBufferSize()
+ {
+ return BASIC_PACKET_SIZE + DataConstants.SIZE_BYTE + DataConstants.SIZE_BOOLEAN + DataConstants.SIZE_LONG;
+ }
+
+ @Override
+ public void encodeBody(final HornetQBuffer buffer)
+ {
+ buffer.writeByte(journalID);
+ buffer.writeBoolean(rollback);
+ buffer.writeLong(txId);
+ }
+
+ @Override
+ public void decodeBody(final HornetQBuffer buffer)
+ {
+ journalID = buffer.readByte();
+ rollback = buffer.readBoolean();
+ txId = buffer.readLong();
+ }
+
+ public boolean isRollback()
+ {
+ return rollback;
+ }
+
+ public long getTxId()
+ {
+ return txId;
+ }
+
+ /**
+ * @return the journalID
+ */
+ public byte getJournalID()
+ {
+ return journalID;
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationDeleteMessage.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationDeleteMessage.java 2009-10-01 22:37:37 UTC (rev 8029)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationDeleteMessage.java 2009-10-02 03:39:15 UTC (rev 8030)
@@ -35,15 +35,7 @@
/** 0 - Bindings, 1 - MessagesJournal */
private byte journalID;
-
- private boolean isUpdate;
- private byte recordType;
-
- private EncodingSupport encodingData;
-
- private byte[] recordData;
-
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -53,14 +45,11 @@
super(REPLICATION_DELETE);
}
- public ReplicationDeleteMessage(byte journalID, boolean isUpdate, long id, byte recordType, EncodingSupport encodingData)
+ public ReplicationDeleteMessage(byte journalID, long id)
{
this();
this.journalID = journalID;
- this.isUpdate = isUpdate;
this.id = id;
- this.recordType = recordType;
- this.encodingData = encodingData;
}
// Public --------------------------------------------------------
@@ -69,11 +58,7 @@
{
return BASIC_PACKET_SIZE +
DataConstants.SIZE_BYTE +
- DataConstants.SIZE_BOOLEAN +
- DataConstants.SIZE_LONG +
- DataConstants.SIZE_BYTE +
- DataConstants.SIZE_INT +
- (encodingData != null ? encodingData.getEncodeSize() : recordData.length);
+ DataConstants.SIZE_LONG;
}
@@ -81,23 +66,14 @@
public void encodeBody(final HornetQBuffer buffer)
{
buffer.writeByte(journalID);
- buffer.writeBoolean(isUpdate);
buffer.writeLong(id);
- buffer.writeByte(recordType);
- buffer.writeInt(encodingData.getEncodeSize());
- encodingData.encode(buffer);
}
@Override
public void decodeBody(final HornetQBuffer buffer)
{
journalID = buffer.readByte();
- isUpdate = buffer.readBoolean();
id = buffer.readLong();
- recordType = buffer.readByte();
- int size = buffer.readInt();
- recordData = new byte[size];
- buffer.readBytes(recordData);
}
/**
@@ -116,22 +92,7 @@
return journalID;
}
- /**
- * @return the recordType
- */
- public byte getRecordType()
- {
- return recordType;
- }
- /**
- * @return the recordData
- */
- public byte[] getRecordData()
- {
- return recordData;
- }
-
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Added: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationDeleteTXMessage.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationDeleteTXMessage.java (rev 0)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationDeleteTXMessage.java 2009-10-02 03:39:15 UTC (rev 8030)
@@ -0,0 +1,134 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.remoting.impl.wireformat;
+
+import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.core.remoting.spi.HornetQBuffer;
+import org.hornetq.utils.DataConstants;
+
+/**
+ * A ReplicationAddMessage
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class ReplicationDeleteTXMessage extends PacketImpl
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private long txId;
+
+ private long id;
+
+ /** 0 - Bindings, 1 - MessagesJournal */
+ private byte journalID;
+
+ private EncodingSupport encodingData;
+
+ private byte[] recordData;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public ReplicationDeleteTXMessage()
+ {
+ super(REPLICATION_DELETE_TX);
+ }
+
+ public ReplicationDeleteTXMessage(byte journalID, long txId, long id, EncodingSupport encodingData)
+ {
+ this();
+ this.journalID = journalID;
+ this.txId = txId;
+ this.id = id;
+ this.encodingData = encodingData;
+ }
+
+ // Public --------------------------------------------------------
+
+ public int getRequiredBufferSize()
+ {
+ return BASIC_PACKET_SIZE +
+ DataConstants.SIZE_BYTE +
+ DataConstants.SIZE_LONG +
+ DataConstants.SIZE_LONG +
+ DataConstants.SIZE_INT +
+ (encodingData != null ? encodingData.getEncodeSize() : recordData.length);
+
+ }
+
+ @Override
+ public void encodeBody(final HornetQBuffer buffer)
+ {
+ buffer.writeByte(journalID);
+ buffer.writeLong(txId);
+ buffer.writeLong(id);
+ buffer.writeInt(encodingData.getEncodeSize());
+ encodingData.encode(buffer);
+ }
+
+ @Override
+ public void decodeBody(final HornetQBuffer buffer)
+ {
+ journalID = buffer.readByte();
+ txId = buffer.readLong();
+ id = buffer.readLong();
+ int size = buffer.readInt();
+ recordData = new byte[size];
+ buffer.readBytes(recordData);
+ }
+
+ /**
+ * @return the id
+ */
+ public long getId()
+ {
+ return id;
+ }
+
+ public long getTxId()
+ {
+ return txId;
+ }
+
+ /**
+ * @return the journalID
+ */
+ public byte getJournalID()
+ {
+ return journalID;
+ }
+
+ /**
+ * @return the recordData
+ */
+ public byte[] getRecordData()
+ {
+ return recordData;
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Added: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationPrepareMessage.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationPrepareMessage.java (rev 0)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationPrepareMessage.java 2009-10-02 03:39:15 UTC (rev 8030)
@@ -0,0 +1,120 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.remoting.impl.wireformat;
+
+import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.core.remoting.spi.HornetQBuffer;
+import org.hornetq.utils.DataConstants;
+
+/**
+ * A ReplicationAddMessage
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class ReplicationPrepareMessage extends PacketImpl
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private long txId;
+
+ /** 0 - Bindings, 1 - MessagesJournal */
+ private byte journalID;
+
+ private EncodingSupport encodingData;
+
+ private byte[] recordData;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public ReplicationPrepareMessage()
+ {
+ super(REPLICATION_DELETE_TX);
+ }
+
+ public ReplicationPrepareMessage(byte journalID, long txId, EncodingSupport encodingData)
+ {
+ this();
+ this.journalID = journalID;
+ this.txId = txId;
+ this.encodingData = encodingData;
+ }
+
+ // Public --------------------------------------------------------
+
+ public int getRequiredBufferSize()
+ {
+ return BASIC_PACKET_SIZE +
+ DataConstants.SIZE_BYTE +
+ DataConstants.SIZE_LONG +
+ DataConstants.SIZE_INT +
+ (encodingData != null ? encodingData.getEncodeSize() : recordData.length);
+
+ }
+
+ @Override
+ public void encodeBody(final HornetQBuffer buffer)
+ {
+ buffer.writeByte(journalID);
+ buffer.writeLong(txId);
+ buffer.writeInt(encodingData.getEncodeSize());
+ encodingData.encode(buffer);
+ }
+
+ @Override
+ public void decodeBody(final HornetQBuffer buffer)
+ {
+ journalID = buffer.readByte();
+ txId = buffer.readLong();
+ int size = buffer.readInt();
+ recordData = new byte[size];
+ buffer.readBytes(recordData);
+ }
+
+ public long getTxId()
+ {
+ return txId;
+ }
+
+ /**
+ * @return the journalID
+ */
+ public byte getJournalID()
+ {
+ return journalID;
+ }
+
+ /**
+ * @return the recordData
+ */
+ public byte[] getRecordData()
+ {
+ return recordData;
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java 2009-10-01 22:37:37 UTC (rev 8029)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java 2009-10-02 03:39:15 UTC (rev 8030)
@@ -25,9 +25,9 @@
{
void appendAddRecord(byte journalID, long id, byte recordType, EncodingSupport record) throws Exception;
- void appendUpdateRecord(byte journalID, long id, byte recordType, EncodingSupport record, boolean sync) throws Exception;
+ void appendUpdateRecord(byte journalID, long id, byte recordType, EncodingSupport record) throws Exception;
- void appendDeleteRecord(byte journalID, long id, boolean sync) throws Exception;
+ void appendDeleteRecord(byte journalID, long id) throws Exception;
void appendAddRecordTransactional(byte journalID, long txID, long id, byte recordType, EncodingSupport record) throws Exception;
@@ -37,10 +37,10 @@
void appendDeleteRecordTransactional(byte journalID, long txID, long id) throws Exception;
- void appendCommitRecord(byte journalID, long txID, boolean sync) throws Exception;
+ void appendCommitRecord(byte journalID, long txID) throws Exception;
- void appendPrepareRecord(byte journalID, long txID, EncodingSupport transactionData, boolean sync) throws Exception;
+ void appendPrepareRecord(byte journalID, long txID, EncodingSupport transactionData) throws Exception;
- void appendRollbackRecord(byte journalID, long txID, boolean sync) throws Exception;
+ void appendRollbackRecord(byte journalID, long txID) throws Exception;
}
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2009-10-01 22:37:37 UTC (rev 8029)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2009-10-02 03:39:15 UTC (rev 8030)
@@ -19,9 +19,13 @@
import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
import org.hornetq.core.remoting.Channel;
import org.hornetq.core.remoting.Packet;
-import org.hornetq.core.remoting.impl.wireformat.NullResponseMessage;
import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
import org.hornetq.core.remoting.impl.wireformat.ReplicationAddMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationAddTXMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationCommitMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteTXMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationPrepareMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationResponseMessage;
import org.hornetq.core.replication.ReplicationEndpoint;
import org.hornetq.core.server.HornetQServer;
@@ -49,7 +53,7 @@
private Journal bindingsJournal;
private Journal messagingJournal;
-
+
private JournalStorageManager storage;
// Static --------------------------------------------------------
@@ -71,9 +75,28 @@
{
if (packet.getType() == PacketImpl.REPLICATION_APPEND)
{
- System.out.println("Replicated");
handleAppendAddRecord(packet);
}
+ else if (packet.getType() == PacketImpl.REPLICATION_APPEND_TX)
+ {
+ handleAppendAddTXRecord(packet);
+ }
+ else if (packet.getType() == PacketImpl.REPLICATION_DELETE)
+ {
+ handleAppendDelete(packet);
+ }
+ else if (packet.getType() == PacketImpl.REPLICATION_DELETE_TX)
+ {
+ handleAppendDeleteTX(packet);
+ }
+ else if (packet.getType() == PacketImpl.REPLICATION_PREPARE)
+ {
+ handlePrepare(packet);
+ }
+ else if (packet.getType() == PacketImpl.REPLICATION_COMMIT_ROLLBACK)
+ {
+ handleCommitRollback(packet);
+ }
}
catch (Exception e)
{
@@ -97,14 +120,14 @@
public void start() throws Exception
{
Configuration config = server.getConfiguration();
-
+
// TODO: this needs an executor
JournalStorageManager storage = new JournalStorageManager(config, null);
storage.start();
-
+
this.bindingsJournal = storage.getBindingsJournal();
this.messagingJournal = storage.getBindingsJournal();
-
+
// We only need to load internal structures on the backup...
storage.loadInternalOnly();
}
@@ -138,26 +161,128 @@
// Protected -----------------------------------------------------
// Private -------------------------------------------------------
+ /**
+ * @param packet
+ */
+ private void handleCommitRollback(Packet packet) throws Exception
+ {
+ ReplicationCommitMessage commitMessage = (ReplicationCommitMessage)packet;
+ Journal journalToUse = getJournal(commitMessage.getJournalID());
+
+
+ if (commitMessage.isRollback())
+ {
+ journalToUse.appendRollbackRecord(commitMessage.getTxId(), false);
+ }
+ else
+ {
+ journalToUse.appendCommitRecord(commitMessage.getTxId(), false);
+ }
+ }
+
/**
* @param packet
+ */
+ private void handlePrepare(Packet packet) throws Exception
+ {
+ ReplicationPrepareMessage prepareMessage = (ReplicationPrepareMessage)packet;
+
+ Journal journalToUse = getJournal(prepareMessage.getJournalID());
+
+ journalToUse.appendPrepareRecord(prepareMessage.getTxId(), prepareMessage.getRecordData(), false);
+ }
+
+ /**
+ * @param packet
+ */
+ private void handleAppendDeleteTX(Packet packet) throws Exception
+ {
+ ReplicationDeleteTXMessage deleteMessage = (ReplicationDeleteTXMessage)packet;
+
+ Journal journalToUse = getJournal(deleteMessage.getJournalID());
+
+ journalToUse.appendDeleteRecordTransactional(deleteMessage.getTxId(),
+ deleteMessage.getId(),
+ deleteMessage.getRecordData());
+ }
+
+ /**
+ * @param packet
+ */
+ private void handleAppendDelete(Packet packet) throws Exception
+ {
+ ReplicationDeleteMessage deleteMessage = (ReplicationDeleteMessage)packet;
+
+ Journal journalToUse = getJournal(deleteMessage.getJournalID());
+
+ journalToUse.appendDeleteRecord(deleteMessage.getId(), false);
+ }
+
+ /**
+ * @param packet
+ */
+ private void handleAppendAddTXRecord(Packet packet) throws Exception
+ {
+ ReplicationAddTXMessage addMessage = (ReplicationAddTXMessage)packet;
+
+ Journal journalToUse = getJournal(addMessage.getJournalID());
+
+ if (addMessage.isUpdate())
+ {
+ journalToUse.appendUpdateRecordTransactional(addMessage.getTxId(),
+ addMessage.getId(),
+ addMessage.getRecordType(),
+ addMessage.getRecordData());
+ }
+ else
+ {
+ journalToUse.appendAddRecordTransactional(addMessage.getTxId(),
+ addMessage.getId(),
+ addMessage.getRecordType(),
+ addMessage.getRecordData());
+ }
+ }
+
+ /**
+ * @param packet
* @throws Exception
*/
private void handleAppendAddRecord(Packet packet) throws Exception
{
ReplicationAddMessage addMessage = (ReplicationAddMessage)packet;
- Journal journalToUse;
- if (addMessage.getJournalID() == (byte)0)
+ Journal journalToUse = getJournal(addMessage.getJournalID());
+
+ if (addMessage.isUpdate())
{
+ journalToUse.appendUpdateRecord(addMessage.getId(),
+ addMessage.getRecordType(),
+ addMessage.getRecordData(),
+ false);
+ }
+ else
+ {
+ journalToUse.appendAddRecord(addMessage.getId(), addMessage.getRecordType(), addMessage.getRecordData(), false);
+ }
+ }
+
+ /**
+ * @param journalID
+ * @return
+ */
+ private Journal getJournal(byte journalID)
+ {
+ Journal journalToUse;
+ if (journalID == (byte)0)
+ {
journalToUse = bindingsJournal;
}
else
{
journalToUse = messagingJournal;
}
-
- journalToUse.appendAddRecord(addMessage.getId(), addMessage.getRecordType(), addMessage.getRecordData(), false);
+ return journalToUse;
}
// Inner classes -------------------------------------------------
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-10-01 22:37:37 UTC (rev 8029)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-10-02 03:39:15 UTC (rev 8030)
@@ -19,6 +19,7 @@
import org.hornetq.core.client.impl.ConnectionManager;
import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.core.journal.impl.JournalImpl;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.Channel;
import org.hornetq.core.remoting.ChannelHandler;
@@ -27,6 +28,12 @@
import org.hornetq.core.remoting.impl.wireformat.CreateReplicationSessionMessage;
import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
import org.hornetq.core.remoting.impl.wireformat.ReplicationAddMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationAddTXMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationCommitMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteTXMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationPrepareMessage;
+import org.hornetq.core.remoting.spi.HornetQBuffer;
import org.hornetq.core.replication.ReplicationManager;
import org.hornetq.core.replication.ReplicationToken;
@@ -46,7 +53,7 @@
// Attributes ----------------------------------------------------
// TODO: where should this be configured?
- private static final int WINDOW_SIZE = 100 * 1024;
+ private static final int WINDOW_SIZE = 1024 * 1024;
private final ResponseHandler responseHandler = new ResponseHandler();
@@ -88,43 +95,52 @@
* @see org.hornetq.core.replication.ReplicationManager#replicate(byte[], org.hornetq.core.replication.ReplicationToken)
*/
- public void appendAddRecord(final byte journalID, final long id, final byte recordType, final EncodingSupport encodingData)
+ public void appendAddRecord(final byte journalID, final long id, final byte recordType, final EncodingSupport record)
{
- sendReplicatePacket(new ReplicationAddMessage(journalID, false, id, recordType, encodingData));
+ sendReplicatePacket(new ReplicationAddMessage(journalID, false, id, recordType, record));
}
-
-
+
/* (non-Javadoc)
* @see org.hornetq.core.replication.ReplicationManager#appendUpdateRecord(byte, long, byte, org.hornetq.core.journal.EncodingSupport, boolean)
*/
- public void appendUpdateRecord(byte journalID, long id, byte recordType, EncodingSupport encodingData, boolean sync) throws Exception
+ public void appendUpdateRecord(byte journalID, long id, byte recordType, EncodingSupport record) throws Exception
{
- sendReplicatePacket(new ReplicationAddMessage(journalID, true, id, recordType, encodingData));
+ sendReplicatePacket(new ReplicationAddMessage(journalID, true, id, recordType, record));
}
/* (non-Javadoc)
* @see org.hornetq.core.replication.ReplicationManager#appendDeleteRecord(byte, long, boolean)
*/
- public void appendDeleteRecord(byte journalID, long id, boolean sync) throws Exception
+ public void appendDeleteRecord(byte journalID, long id) throws Exception
{
- // TODO Auto-generated method stub
-
+ sendReplicatePacket(new ReplicationDeleteMessage(journalID, id));
}
+ public void appendAddRecordTransactional(byte journalID, long txID, long id, byte recordType, EncodingSupport record) throws Exception
+ {
+ sendReplicatePacket(new ReplicationAddTXMessage(journalID, false, txID, id, recordType, record));
+ }
- public void appendAddRecordTransactional(byte journalID, long txID, long id, byte recordType, EncodingSupport record) throws Exception
+ /* (non-Javadoc)
+ * @see org.hornetq.core.replication.ReplicationManager#appendUpdateRecordTransactional(byte, long, long, byte, org.hornetq.core.journal.EncodingSupport)
+ */
+ public void appendUpdateRecordTransactional(byte journalID,
+ long txID,
+ long id,
+ byte recordType,
+ EncodingSupport record) throws Exception
{
-
+ sendReplicatePacket(new ReplicationAddTXMessage(journalID, true, txID, id, recordType, record));
}
+
/* (non-Javadoc)
* @see org.hornetq.core.replication.ReplicationManager#appendCommitRecord(byte, long, boolean)
*/
- public void appendCommitRecord(byte journalID, long txID, boolean sync) throws Exception
+ public void appendCommitRecord(byte journalID, long txID) throws Exception
{
- // TODO Auto-generated method stub
-
+ sendReplicatePacket(new ReplicationCommitMessage(journalID, false, txID));
}
/* (non-Javadoc)
@@ -132,8 +148,7 @@
*/
public void appendDeleteRecordTransactional(byte journalID, long txID, long id, EncodingSupport record) throws Exception
{
- // TODO Auto-generated method stub
-
+ sendReplicatePacket(new ReplicationDeleteTXMessage(journalID, txID, id, record));
}
/* (non-Javadoc)
@@ -141,46 +156,26 @@
*/
public void appendDeleteRecordTransactional(byte journalID, long txID, long id) throws Exception
{
- // TODO Auto-generated method stub
-
+ sendReplicatePacket(new ReplicationDeleteTXMessage(journalID, txID, id, NullEncoding.instance));
}
/* (non-Javadoc)
* @see org.hornetq.core.replication.ReplicationManager#appendPrepareRecord(byte, long, org.hornetq.core.journal.EncodingSupport, boolean)
*/
- public void appendPrepareRecord(byte journalID, long txID, EncodingSupport transactionData, boolean sync) throws Exception
+ public void appendPrepareRecord(byte journalID, long txID, EncodingSupport transactionData) throws Exception
{
- // TODO Auto-generated method stub
-
+ sendReplicatePacket(new ReplicationPrepareMessage(journalID, txID, transactionData));
}
/* (non-Javadoc)
* @see org.hornetq.core.replication.ReplicationManager#appendRollbackRecord(byte, long, boolean)
*/
- public void appendRollbackRecord(byte journalID, long txID, boolean sync) throws Exception
+ public void appendRollbackRecord(byte journalID, long txID) throws Exception
{
- // TODO Auto-generated method stub
-
+ sendReplicatePacket(new ReplicationCommitMessage(journalID, false, txID));
}
/* (non-Javadoc)
- * @see org.hornetq.core.replication.ReplicationManager#appendUpdateRecordTransactional(byte, long, long, byte, org.hornetq.core.journal.EncodingSupport)
- */
- public void appendUpdateRecordTransactional(byte journalID,
- long txID,
- long id,
- byte recordType,
- EncodingSupport record) throws Exception
- {
- // TODO Auto-generated method stub
-
- }
-
-
-
-
-
- /* (non-Javadoc)
* @see org.hornetq.core.server.HornetQComponent#isStarted()
*/
public synchronized boolean isStarted()
@@ -202,7 +197,7 @@
this.replicatingChannel = connection.getChannel(channelID, WINDOW_SIZE, false);
this.replicatingChannel.setHandler(this.responseHandler);
-
+
CreateReplicationSessionMessage replicationStartPackage = new CreateReplicationSessionMessage(channelID,
WINDOW_SIZE);
@@ -312,4 +307,24 @@
}
+ private static class NullEncoding implements EncodingSupport
+ {
+
+ static NullEncoding instance = new NullEncoding();
+
+ public void decode(final HornetQBuffer buffer)
+ {
+ }
+
+ public void encode(final HornetQBuffer buffer)
+ {
+ }
+
+ public int getEncodeSize()
+ {
+ return 0;
+ }
+
+ }
+
}
Modified: branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-10-01 22:37:37 UTC (rev 8029)
+++ branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-10-02 03:39:15 UTC (rev 8030)
@@ -146,10 +146,18 @@
{
ReplicationManagerImpl manager = new ReplicationManagerImpl(connectionManager, executor);
manager.start();
- for (int i = 0; i < 100; i++)
- {
- manager.appendAddRecord((byte)0, i, (byte)1, new DataImplement());
- }
+
+ manager.appendAddRecord((byte)0, 1, (byte)1, new FakeData());
+ manager.appendUpdateRecord((byte)0, 1, (byte)2, new FakeData());
+ manager.appendDeleteRecord((byte)0, 1);
+ manager.appendAddRecordTransactional((byte)0, 2, 2, (byte)1, new FakeData());
+ manager.appendUpdateRecordTransactional((byte)0, 2, 2, (byte)2, new FakeData());
+ manager.appendCommitRecord((byte)0, 2);
+
+ manager.appendDeleteRecordTransactional((byte)0, 3, 4,new FakeData());
+ manager.appendPrepareRecord((byte)0, 3, new FakeData());
+ manager.appendRollbackRecord((byte)0, 3);
+
final CountDownLatch latch = new CountDownLatch(1);
manager.getReplicationToken().addFutureCompletion(new Runnable()
{
@@ -169,7 +177,7 @@
}
}
- class DataImplement implements EncodingSupport
+ class FakeData implements EncodingSupport
{
public void decode(HornetQBuffer buffer)
More information about the hornetq-commits
mailing list