[hornetq-commits] JBoss hornetq SVN: r8026 - in branches/Replication_Clebert: src/main/org/hornetq/core/remoting/impl and 5 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Thu Oct 1 17:02:08 EDT 2009
Author: clebert.suconic at jboss.com
Date: 2009-10-01 17:02:07 -0400 (Thu, 01 Oct 2009)
New Revision: 8026
Added:
branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationDeleteMessage.java
branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationResponseMessage.java
Modified:
branches/Replication_Clebert/.classpath
branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java
branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java
branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddMessage.java
branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java
branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationTokenImpl.java
branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
Log:
backup changes
Modified: branches/Replication_Clebert/.classpath
===================================================================
--- branches/Replication_Clebert/.classpath 2009-10-01 14:01:21 UTC (rev 8025)
+++ branches/Replication_Clebert/.classpath 2009-10-01 21:02:07 UTC (rev 8026)
@@ -97,7 +97,7 @@
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
<classpathentry kind="lib" path="tests/tmpfiles"/>
<classpathentry kind="lib" path="thirdparty/net/java/dev/javacc/lib/javacc.jar"/>
- <classpathentry kind="lib" path="thirdparty/org/jboss/netty/lib/netty.jar"/>
+ <classpathentry kind="lib" path="thirdparty/org/jboss/netty/lib/netty.jar" sourcepath="/netty-3.1.3.GA"/>
<classpathentry kind="lib" path="thirdparty/log4j/lib/log4j.jar"/>
<classpathentry kind="lib" path="thirdparty/org/jboss/naming/lib/jnpserver.jar"/>
<classpathentry kind="lib" path="thirdparty/org/jboss/security/lib/jbosssx.jar"/>
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java 2009-10-01 14:01:21 UTC (rev 8025)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java 2009-10-01 21:02:07 UTC (rev 8026)
@@ -13,6 +13,7 @@
package org.hornetq.core.remoting.impl;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_RESPONSE;
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_APPEND;
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.CREATE_REPLICATION;
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.CREATESESSION;
@@ -75,6 +76,7 @@
import org.hornetq.core.remoting.impl.wireformat.ReattachSessionMessage;
import org.hornetq.core.remoting.impl.wireformat.ReattachSessionResponseMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationAddMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationResponseMessage;
import org.hornetq.core.remoting.impl.wireformat.RollbackMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionAcknowledgeMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionBindingQueryMessage;
@@ -364,6 +366,11 @@
packet = new ReplicationAddMessage();
break;
}
+ case REPLICATION_RESPONSE:
+ {
+ packet = new ReplicationResponseMessage();
+ break;
+ }
default:
{
throw new IllegalArgumentException("Invalid type: " + packetType);
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java 2009-10-01 14:01:21 UTC (rev 8025)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java 2009-10-01 21:02:07 UTC (rev 8026)
@@ -28,16 +28,17 @@
// Constants -------------------------------------------------------------------------
private static final Logger log = Logger.getLogger(PacketImpl.class);
-
+
// The minimal size for all the packets, Common data for all the packets (look at PacketImpl.encode)
- protected static final int BASIC_PACKET_SIZE = DataConstants.SIZE_INT + DataConstants.SIZE_BYTE + DataConstants.SIZE_LONG;
+ protected static final int BASIC_PACKET_SIZE = DataConstants.SIZE_INT + DataConstants.SIZE_BYTE +
+ DataConstants.SIZE_LONG;
private long channelID;
private final byte type;
-
+
private int size;
-
+
// The packet types
// -----------------------------------------------------------------------------------
@@ -64,15 +65,14 @@
public static final byte CREATE_QUEUE = 34;
public static final byte DELETE_QUEUE = 35;
-
+
public static final byte CREATE_REPLICATION = 36;
-
// Session
public static final byte SESS_CREATECONSUMER = 40;
public static final byte SESS_ACKNOWLEDGE = 41;
-
+
public static final byte SESS_EXPIRED = 42;
public static final byte SESS_COMMIT = 43;
@@ -128,7 +128,7 @@
public static final byte SESS_FLOWTOKEN = 70;
public static final byte SESS_SEND = 71;
-
+
public static final byte SESS_SEND_LARGE = 72;
public static final byte SESS_SEND_CONTINUATION = 73;
@@ -138,11 +138,15 @@
public static final byte SESS_RECEIVE_MSG = 75;
public static final byte SESS_RECEIVE_CONTINUATION = 76;
-
+
// Replication
-
- public static final byte REPLICATION_APPEND = 80;
+ public static final byte REPLICATION_RESPONSE = 80;
+
+ public static final byte REPLICATION_APPEND = 81;
+
+ public static final byte REPLICATION_DELETE = 82;
+
// Static --------------------------------------------------------
public PacketImpl(final byte type)
@@ -152,7 +156,6 @@
// Public --------------------------------------------------------
-
public byte getType()
{
return type;
@@ -167,40 +170,40 @@
{
this.channelID = channelID;
}
-
+
public int encode(final HornetQBuffer buffer)
{
// The standard header fields
buffer.writeInt(0); // The length gets filled in at the end
buffer.writeByte(type);
buffer.writeLong(channelID);
-
+
encodeBody(buffer);
size = buffer.writerIndex();
-
+
// The length doesn't include the actual length byte
int len = size - DataConstants.SIZE_INT;
buffer.setInt(0, len);
-
+
return size;
}
public void decode(final HornetQBuffer buffer)
{
channelID = buffer.readLong();
-
+
decodeBody(buffer);
-
+
size = buffer.readerIndex();
}
-
+
public final int getPacketSize()
{
return size;
}
-
+
public int getRequiredBufferSize()
{
return BASIC_PACKET_SIZE;
@@ -256,7 +259,7 @@
{
return DataConstants.SIZE_INT + str.length() * 2;
}
-
+
protected int nullableStringEncodeSize(String str)
{
return DataConstants.SIZE_BOOLEAN + (str != null ? stringEncodeSize(str) : 0);
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddMessage.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddMessage.java 2009-10-01 14:01:21 UTC (rev 8025)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddMessage.java 2009-10-01 21:02:07 UTC (rev 8026)
@@ -35,6 +35,8 @@
/** 0 - Bindings, 1 - MessagesJournal */
private byte journalID;
+
+ private boolean isUpdate;
private byte recordType;
@@ -51,10 +53,11 @@
super(REPLICATION_APPEND);
}
- public ReplicationAddMessage(byte journalID, long id, byte recordType, EncodingSupport encodingData)
+ public ReplicationAddMessage(byte journalID, boolean isUpdate, long id, byte recordType, EncodingSupport encodingData)
{
this();
this.journalID = journalID;
+ this.isUpdate = isUpdate;
this.id = id;
this.recordType = recordType;
this.encodingData = encodingData;
@@ -64,7 +67,9 @@
public int getRequiredBufferSize()
{
- return BASIC_PACKET_SIZE + DataConstants.SIZE_BYTE +
+ return BASIC_PACKET_SIZE +
+ DataConstants.SIZE_BYTE +
+ DataConstants.SIZE_BOOLEAN +
DataConstants.SIZE_LONG +
DataConstants.SIZE_BYTE +
DataConstants.SIZE_INT +
@@ -76,6 +81,7 @@
public void encodeBody(final HornetQBuffer buffer)
{
buffer.writeByte(journalID);
+ buffer.writeBoolean(isUpdate);
buffer.writeLong(id);
buffer.writeByte(recordType);
buffer.writeInt(encodingData.getEncodeSize());
@@ -86,6 +92,7 @@
public void decodeBody(final HornetQBuffer buffer)
{
journalID = buffer.readByte();
+ isUpdate = buffer.readBoolean();
id = buffer.readLong();
recordType = buffer.readByte();
int size = buffer.readInt();
Added: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationDeleteMessage.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationDeleteMessage.java (rev 0)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationDeleteMessage.java 2009-10-01 21:02:07 UTC (rev 8026)
@@ -0,0 +1,143 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.remoting.impl.wireformat;
+
+import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.core.remoting.spi.HornetQBuffer;
+import org.hornetq.utils.DataConstants;
+
+/**
+ * A ReplicationAddMessage
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class ReplicationDeleteMessage extends PacketImpl
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private long id;
+
+ /** 0 - Bindings, 1 - MessagesJournal */
+ private byte journalID;
+
+ private boolean isUpdate;
+
+ private byte recordType;
+
+ private EncodingSupport encodingData;
+
+ private byte[] recordData;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public ReplicationDeleteMessage()
+ {
+ super(REPLICATION_DELETE);
+ }
+
+ public ReplicationDeleteMessage(byte journalID, boolean isUpdate, long id, byte recordType, EncodingSupport encodingData)
+ {
+ this();
+ this.journalID = journalID;
+ this.isUpdate = isUpdate;
+ this.id = id;
+ this.recordType = recordType;
+ this.encodingData = encodingData;
+ }
+
+ // Public --------------------------------------------------------
+
+ public int getRequiredBufferSize()
+ {
+ return BASIC_PACKET_SIZE +
+ DataConstants.SIZE_BYTE +
+ DataConstants.SIZE_BOOLEAN +
+ DataConstants.SIZE_LONG +
+ DataConstants.SIZE_BYTE +
+ DataConstants.SIZE_INT +
+ (encodingData != null ? encodingData.getEncodeSize() : recordData.length);
+
+ }
+
+ @Override
+ public void encodeBody(final HornetQBuffer buffer)
+ {
+ buffer.writeByte(journalID);
+ buffer.writeBoolean(isUpdate);
+ buffer.writeLong(id);
+ buffer.writeByte(recordType);
+ buffer.writeInt(encodingData.getEncodeSize());
+ encodingData.encode(buffer);
+ }
+
+ @Override
+ public void decodeBody(final HornetQBuffer buffer)
+ {
+ journalID = buffer.readByte();
+ isUpdate = buffer.readBoolean();
+ id = buffer.readLong();
+ recordType = buffer.readByte();
+ int size = buffer.readInt();
+ recordData = new byte[size];
+ buffer.readBytes(recordData);
+ }
+
+ /**
+ * @return the id
+ */
+ public long getId()
+ {
+ return id;
+ }
+
+ /**
+ * @return the journalID
+ */
+ public byte getJournalID()
+ {
+ return journalID;
+ }
+
+ /**
+ * @return the recordType
+ */
+ public byte getRecordType()
+ {
+ return recordType;
+ }
+
+ /**
+ * @return the recordData
+ */
+ public byte[] getRecordData()
+ {
+ return recordData;
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Added: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationResponseMessage.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationResponseMessage.java (rev 0)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationResponseMessage.java 2009-10-01 21:02:07 UTC (rev 8026)
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.remoting.impl.wireformat;
+
+
+/**
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision$</tt>
+ */
+public class ReplicationResponseMessage extends PacketImpl
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public ReplicationResponseMessage()
+ {
+ super(REPLICATION_RESPONSE);
+ }
+
+ // Public --------------------------------------------------------
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.remoting.Packet#getRequiredBufferSize()
+ */
+ public int getRequiredBufferSize()
+ {
+ return BASIC_PACKET_SIZE;
+ }
+
+
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java 2009-10-01 14:01:21 UTC (rev 8025)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java 2009-10-01 21:02:07 UTC (rev 8026)
@@ -16,13 +16,31 @@
import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.server.HornetQComponent;
-
/**
* @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
*
*
*/
-public interface ReplicationManager extends HornetQComponent
+public interface ReplicationManager extends HornetQComponent
{
- void appendAddRecord(byte journalID, long id, byte recordType, EncodingSupport record);
+ void appendAddRecord(byte journalID, long id, byte recordType, EncodingSupport record) throws Exception;
+
+ void appendUpdateRecord(byte journalID, long id, byte recordType, EncodingSupport record, boolean sync) throws Exception;
+
+ void appendDeleteRecord(byte journalID, long id, boolean sync) throws Exception;
+
+ void appendAddRecordTransactional(byte journalID, long txID, long id, byte recordType, EncodingSupport record) throws Exception;
+
+ void appendUpdateRecordTransactional(byte journalID, long txID, long id, byte recordType, EncodingSupport record) throws Exception;
+
+ void appendDeleteRecordTransactional(byte journalID, long txID, long id, EncodingSupport record) throws Exception;
+
+ void appendDeleteRecordTransactional(byte journalID, long txID, long id) throws Exception;
+
+ void appendCommitRecord(byte journalID, long txID, boolean sync) throws Exception;
+
+ void appendPrepareRecord(byte journalID, long txID, EncodingSupport transactionData, boolean sync) throws Exception;
+
+ void appendRollbackRecord(byte journalID, long txID, boolean sync) throws Exception;
+
}
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2009-10-01 14:01:21 UTC (rev 8025)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2009-10-01 21:02:07 UTC (rev 8026)
@@ -22,6 +22,7 @@
import org.hornetq.core.remoting.impl.wireformat.NullResponseMessage;
import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
import org.hornetq.core.remoting.impl.wireformat.ReplicationAddMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationResponseMessage;
import org.hornetq.core.replication.ReplicationEndpoint;
import org.hornetq.core.server.HornetQServer;
@@ -79,7 +80,7 @@
// TODO: what to do when the IO fails on the backup side? should we shutdown the backup?
log.warn(e.getMessage(), e);
}
- channel.send(new NullResponseMessage());
+ channel.send(new ReplicationResponseMessage());
}
/* (non-Javadoc)
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-10-01 14:01:21 UTC (rev 8025)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-10-01 21:02:07 UTC (rev 8026)
@@ -90,10 +90,97 @@
public void appendAddRecord(final byte journalID, final long id, final byte recordType, final EncodingSupport encodingData)
{
- sendReplicatePacket(new ReplicationAddMessage(journalID, id, recordType, encodingData));
+ sendReplicatePacket(new ReplicationAddMessage(journalID, false, id, recordType, encodingData));
}
+
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.replication.ReplicationManager#appendUpdateRecord(byte, long, byte, org.hornetq.core.journal.EncodingSupport, boolean)
+ */
+ public void appendUpdateRecord(byte journalID, long id, byte recordType, EncodingSupport encodingData, boolean sync) throws Exception
+ {
+ sendReplicatePacket(new ReplicationAddMessage(journalID, true, id, recordType, encodingData));
+ }
/* (non-Javadoc)
+ * @see org.hornetq.core.replication.ReplicationManager#appendDeleteRecord(byte, long, boolean)
+ */
+ public void appendDeleteRecord(byte journalID, long id, boolean sync) throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+
+
+ public void appendAddRecordTransactional(byte journalID, long txID, long id, byte recordType, EncodingSupport record) throws Exception
+ {
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.replication.ReplicationManager#appendCommitRecord(byte, long, boolean)
+ */
+ public void appendCommitRecord(byte journalID, long txID, boolean sync) throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.replication.ReplicationManager#appendDeleteRecordTransactional(byte, long, long, org.hornetq.core.journal.EncodingSupport)
+ */
+ public void appendDeleteRecordTransactional(byte journalID, long txID, long id, EncodingSupport record) throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.replication.ReplicationManager#appendDeleteRecordTransactional(byte, long, long)
+ */
+ public void appendDeleteRecordTransactional(byte journalID, long txID, long id) throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.replication.ReplicationManager#appendPrepareRecord(byte, long, org.hornetq.core.journal.EncodingSupport, boolean)
+ */
+ public void appendPrepareRecord(byte journalID, long txID, EncodingSupport transactionData, boolean sync) throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.replication.ReplicationManager#appendRollbackRecord(byte, long, boolean)
+ */
+ public void appendRollbackRecord(byte journalID, long txID, boolean sync) throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.replication.ReplicationManager#appendUpdateRecordTransactional(byte, long, long, byte, org.hornetq.core.journal.EncodingSupport)
+ */
+ public void appendUpdateRecordTransactional(byte journalID,
+ long txID,
+ long id,
+ byte recordType,
+ EncodingSupport record) throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+
+
+
+
+ /* (non-Javadoc)
* @see org.hornetq.core.server.HornetQComponent#isStarted()
*/
public synchronized boolean isStarted()
@@ -217,7 +304,7 @@
public void handlePacket(Packet packet)
{
System.out.println("HandlePacket on client");
- if (packet.getType() == PacketImpl.NULL_RESPONSE)
+ if (packet.getType() == PacketImpl.REPLICATION_RESPONSE)
{
replicated();
}
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationTokenImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationTokenImpl.java 2009-10-01 14:01:21 UTC (rev 8025)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationTokenImpl.java 2009-10-01 21:02:07 UTC (rev 8026)
@@ -52,7 +52,7 @@
/** To be called by the replication manager, when data is confirmed on the channel */
public synchronized void replicated()
{
- if (pendings-- == 0)
+ if (--pendings == 0)
{
if (tasks != null)
{
@@ -63,13 +63,12 @@
tasks.clear();
}
}
- System.out.println("pendings (replicated) = " + pendings);
}
/** You may have several actions to be done after a replication operation is completed. */
public synchronized void addFutureCompletion(Runnable runnable)
{
- System.out.println("pendings = " + pendings);
+ System.out.println("pendings on addFutureCompletion = " + pendings);
if (pendings == 0)
{
executor.execute(runnable);
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-10-01 14:01:21 UTC (rev 8025)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-10-01 21:02:07 UTC (rev 8026)
@@ -603,10 +603,10 @@
if (replicationEndpoint == null)
{
replicationEndpoint = new ReplicationEndpointImpl(this);
+ replicationEndpoint.setChannel(channel);
replicationEndpoint.start();
}
- replicationEndpoint.setChannel(channel);
return replicationEndpoint;
}
Modified: branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-10-01 14:01:21 UTC (rev 8025)
+++ branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-10-01 21:02:07 UTC (rev 8026)
@@ -146,7 +146,10 @@
{
ReplicationManagerImpl manager = new ReplicationManagerImpl(connectionManager, executor);
manager.start();
- manager.appendAddRecord((byte)0, 1, (byte)1, new DataImplement());
+ for (int i = 0; i < 100; i++)
+ {
+ manager.appendAddRecord((byte)0, i, (byte)1, new DataImplement());
+ }
final CountDownLatch latch = new CountDownLatch(1);
manager.getReplicationToken().addFutureCompletion(new Runnable()
{
More information about the hornetq-commits
mailing list