[hornetq-commits] JBoss hornetq SVN: r8026 - in branches/Replication_Clebert: src/main/org/hornetq/core/remoting/impl and 5 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Oct 1 17:02:08 EDT 2009


Author: clebert.suconic at jboss.com
Date: 2009-10-01 17:02:07 -0400 (Thu, 01 Oct 2009)
New Revision: 8026

Added:
   branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationDeleteMessage.java
   branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationResponseMessage.java
Modified:
   branches/Replication_Clebert/.classpath
   branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java
   branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java
   branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddMessage.java
   branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java
   branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
   branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
   branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationTokenImpl.java
   branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
   branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
Log:
backup changes

Modified: branches/Replication_Clebert/.classpath
===================================================================
--- branches/Replication_Clebert/.classpath	2009-10-01 14:01:21 UTC (rev 8025)
+++ branches/Replication_Clebert/.classpath	2009-10-01 21:02:07 UTC (rev 8026)
@@ -97,7 +97,7 @@
 	<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
 	<classpathentry kind="lib" path="tests/tmpfiles"/>
 	<classpathentry kind="lib" path="thirdparty/net/java/dev/javacc/lib/javacc.jar"/>
-	<classpathentry kind="lib" path="thirdparty/org/jboss/netty/lib/netty.jar"/>
+	<classpathentry kind="lib" path="thirdparty/org/jboss/netty/lib/netty.jar" sourcepath="/netty-3.1.3.GA"/>
 	<classpathentry kind="lib" path="thirdparty/log4j/lib/log4j.jar"/>
 	<classpathentry kind="lib" path="thirdparty/org/jboss/naming/lib/jnpserver.jar"/>
 	<classpathentry kind="lib" path="thirdparty/org/jboss/security/lib/jbosssx.jar"/>

Modified: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java	2009-10-01 14:01:21 UTC (rev 8025)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java	2009-10-01 21:02:07 UTC (rev 8026)
@@ -13,6 +13,7 @@
 
 package org.hornetq.core.remoting.impl;
 
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_RESPONSE;
 import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_APPEND;
 import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.CREATE_REPLICATION;
 import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.CREATESESSION;
@@ -75,6 +76,7 @@
 import org.hornetq.core.remoting.impl.wireformat.ReattachSessionMessage;
 import org.hornetq.core.remoting.impl.wireformat.ReattachSessionResponseMessage;
 import org.hornetq.core.remoting.impl.wireformat.ReplicationAddMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationResponseMessage;
 import org.hornetq.core.remoting.impl.wireformat.RollbackMessage;
 import org.hornetq.core.remoting.impl.wireformat.SessionAcknowledgeMessage;
 import org.hornetq.core.remoting.impl.wireformat.SessionBindingQueryMessage;
@@ -364,6 +366,11 @@
             packet = new ReplicationAddMessage();
             break;
          }
+         case REPLICATION_RESPONSE:
+         {
+            packet = new ReplicationResponseMessage();
+            break;
+         }
          default:
          {
             throw new IllegalArgumentException("Invalid type: " + packetType);

Modified: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java	2009-10-01 14:01:21 UTC (rev 8025)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java	2009-10-01 21:02:07 UTC (rev 8026)
@@ -28,16 +28,17 @@
    // Constants -------------------------------------------------------------------------
 
    private static final Logger log = Logger.getLogger(PacketImpl.class);
-   
+
    // The minimal size for all the packets, Common data for all the packets (look at PacketImpl.encode)
-   protected static final int BASIC_PACKET_SIZE = DataConstants.SIZE_INT + DataConstants.SIZE_BYTE + DataConstants.SIZE_LONG;
+   protected static final int BASIC_PACKET_SIZE = DataConstants.SIZE_INT + DataConstants.SIZE_BYTE +
+                                                  DataConstants.SIZE_LONG;
 
    private long channelID;
 
    private final byte type;
-   
+
    private int size;
-   
+
    // The packet types
    // -----------------------------------------------------------------------------------
 
@@ -64,15 +65,14 @@
    public static final byte CREATE_QUEUE = 34;
 
    public static final byte DELETE_QUEUE = 35;
-   
+
    public static final byte CREATE_REPLICATION = 36;
 
-
    // Session
    public static final byte SESS_CREATECONSUMER = 40;
 
    public static final byte SESS_ACKNOWLEDGE = 41;
-   
+
    public static final byte SESS_EXPIRED = 42;
 
    public static final byte SESS_COMMIT = 43;
@@ -128,7 +128,7 @@
    public static final byte SESS_FLOWTOKEN = 70;
 
    public static final byte SESS_SEND = 71;
-   
+
    public static final byte SESS_SEND_LARGE = 72;
 
    public static final byte SESS_SEND_CONTINUATION = 73;
@@ -138,11 +138,15 @@
    public static final byte SESS_RECEIVE_MSG = 75;
 
    public static final byte SESS_RECEIVE_CONTINUATION = 76;
-   
+
    // Replication
-   
-   public static final byte REPLICATION_APPEND = 80;
 
+   public static final byte REPLICATION_RESPONSE = 80;
+
+   public static final byte REPLICATION_APPEND = 81;
+
+   public static final byte REPLICATION_DELETE = 82;
+
    // Static --------------------------------------------------------
 
    public PacketImpl(final byte type)
@@ -152,7 +156,6 @@
 
    // Public --------------------------------------------------------
 
-
    public byte getType()
    {
       return type;
@@ -167,40 +170,40 @@
    {
       this.channelID = channelID;
    }
-   
+
    public int encode(final HornetQBuffer buffer)
    {
       // The standard header fields
       buffer.writeInt(0); // The length gets filled in at the end
       buffer.writeByte(type);
       buffer.writeLong(channelID);
-      
+
       encodeBody(buffer);
 
       size = buffer.writerIndex();
-      
+
       // The length doesn't include the actual length byte
       int len = size - DataConstants.SIZE_INT;
 
       buffer.setInt(0, len);
-      
+
       return size;
    }
 
    public void decode(final HornetQBuffer buffer)
    {
       channelID = buffer.readLong();
-      
+
       decodeBody(buffer);
-      
+
       size = buffer.readerIndex();
    }
-   
+
    public final int getPacketSize()
    {
       return size;
    }
-   
+
    public int getRequiredBufferSize()
    {
       return BASIC_PACKET_SIZE;
@@ -256,7 +259,7 @@
    {
       return DataConstants.SIZE_INT + str.length() * 2;
    }
-   
+
    protected int nullableStringEncodeSize(String str)
    {
       return DataConstants.SIZE_BOOLEAN + (str != null ? stringEncodeSize(str) : 0);

Modified: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddMessage.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddMessage.java	2009-10-01 14:01:21 UTC (rev 8025)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddMessage.java	2009-10-01 21:02:07 UTC (rev 8026)
@@ -35,6 +35,8 @@
 
    /** 0 - Bindings, 1 - MessagesJournal */
    private byte journalID;
+   
+   private boolean isUpdate;
 
    private byte recordType;
 
@@ -51,10 +53,11 @@
       super(REPLICATION_APPEND);
    }
 
-   public ReplicationAddMessage(byte journalID, long id, byte recordType, EncodingSupport encodingData)
+   public ReplicationAddMessage(byte journalID, boolean isUpdate, long id, byte recordType, EncodingSupport encodingData)
    {
       this();
       this.journalID = journalID;
+      this.isUpdate = isUpdate;
       this.id = id;
       this.recordType = recordType;
       this.encodingData = encodingData;
@@ -64,7 +67,9 @@
 
    public int getRequiredBufferSize()
    {
-      return BASIC_PACKET_SIZE + DataConstants.SIZE_BYTE +
+      return BASIC_PACKET_SIZE + 
+             DataConstants.SIZE_BYTE +
+             DataConstants.SIZE_BOOLEAN +
              DataConstants.SIZE_LONG +
              DataConstants.SIZE_BYTE +
              DataConstants.SIZE_INT +
@@ -76,6 +81,7 @@
    public void encodeBody(final HornetQBuffer buffer)
    {
       buffer.writeByte(journalID);
+      buffer.writeBoolean(isUpdate);
       buffer.writeLong(id);
       buffer.writeByte(recordType);
       buffer.writeInt(encodingData.getEncodeSize());
@@ -86,6 +92,7 @@
    public void decodeBody(final HornetQBuffer buffer)
    {
       journalID = buffer.readByte();
+      isUpdate = buffer.readBoolean();
       id = buffer.readLong();
       recordType = buffer.readByte();
       int size = buffer.readInt();

Added: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationDeleteMessage.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationDeleteMessage.java	                        (rev 0)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationDeleteMessage.java	2009-10-01 21:02:07 UTC (rev 8026)
@@ -0,0 +1,143 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.remoting.impl.wireformat;
+
+import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.core.remoting.spi.HornetQBuffer;
+import org.hornetq.utils.DataConstants;
+
+/**
+ * A ReplicationAddMessage
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class ReplicationDeleteMessage extends PacketImpl
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private long id;
+
+   /** 0 - Bindings, 1 - MessagesJournal */
+   private byte journalID;
+   
+   private boolean isUpdate;
+
+   private byte recordType;
+
+   private EncodingSupport encodingData;
+
+   private byte[] recordData;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public ReplicationDeleteMessage()
+   {
+      super(REPLICATION_DELETE);
+   }
+
+   public ReplicationDeleteMessage(byte journalID, boolean isUpdate, long id, byte recordType, EncodingSupport encodingData)
+   {
+      this();
+      this.journalID = journalID;
+      this.isUpdate = isUpdate;
+      this.id = id;
+      this.recordType = recordType;
+      this.encodingData = encodingData;
+   }
+
+   // Public --------------------------------------------------------
+
+   public int getRequiredBufferSize()
+   {
+      return BASIC_PACKET_SIZE + 
+             DataConstants.SIZE_BYTE +
+             DataConstants.SIZE_BOOLEAN +
+             DataConstants.SIZE_LONG +
+             DataConstants.SIZE_BYTE +
+             DataConstants.SIZE_INT +
+             (encodingData != null ? encodingData.getEncodeSize() : recordData.length);
+
+   }
+
+   @Override
+   public void encodeBody(final HornetQBuffer buffer)
+   {
+      buffer.writeByte(journalID);
+      buffer.writeBoolean(isUpdate);
+      buffer.writeLong(id);
+      buffer.writeByte(recordType);
+      buffer.writeInt(encodingData.getEncodeSize());
+      encodingData.encode(buffer);
+   }
+
+   @Override
+   public void decodeBody(final HornetQBuffer buffer)
+   {
+      journalID = buffer.readByte();
+      isUpdate = buffer.readBoolean();
+      id = buffer.readLong();
+      recordType = buffer.readByte();
+      int size = buffer.readInt();
+      recordData = new byte[size];
+      buffer.readBytes(recordData);
+   }
+
+   /**
+    * @return the id
+    */
+   public long getId()
+   {
+      return id;
+   }
+
+   /**
+    * @return the journalID
+    */
+   public byte getJournalID()
+   {
+      return journalID;
+   }
+
+   /**
+    * @return the recordType
+    */
+   public byte getRecordType()
+   {
+      return recordType;
+   }
+
+   /**
+    * @return the recordData
+    */
+   public byte[] getRecordData()
+   {
+      return recordData;
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Added: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationResponseMessage.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationResponseMessage.java	                        (rev 0)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationResponseMessage.java	2009-10-01 21:02:07 UTC (rev 8026)
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.remoting.impl.wireformat;
+
+
+/**
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision$</tt>
+ */
+public class ReplicationResponseMessage extends PacketImpl
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public ReplicationResponseMessage()
+   {
+      super(REPLICATION_RESPONSE);
+   }
+   
+   // Public --------------------------------------------------------
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.remoting.Packet#getRequiredBufferSize()
+    */
+   public int getRequiredBufferSize()
+   {
+      return BASIC_PACKET_SIZE;
+   }
+   
+   
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}

Modified: branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java	2009-10-01 14:01:21 UTC (rev 8025)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java	2009-10-01 21:02:07 UTC (rev 8026)
@@ -16,13 +16,31 @@
 import org.hornetq.core.journal.EncodingSupport;
 import org.hornetq.core.server.HornetQComponent;
 
-
 /**
  * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
  *
  *
  */
-public interface ReplicationManager  extends HornetQComponent
+public interface ReplicationManager extends HornetQComponent
 {
-   void appendAddRecord(byte journalID, long id, byte recordType, EncodingSupport record);
+   void appendAddRecord(byte journalID, long id, byte recordType, EncodingSupport record) throws Exception;
+
+   void appendUpdateRecord(byte journalID, long id, byte recordType, EncodingSupport record, boolean sync) throws Exception;
+
+   void appendDeleteRecord(byte journalID, long id, boolean sync) throws Exception;
+
+   void appendAddRecordTransactional(byte journalID, long txID, long id, byte recordType, EncodingSupport record) throws Exception;
+
+   void appendUpdateRecordTransactional(byte journalID, long txID, long id, byte recordType, EncodingSupport record) throws Exception;
+
+   void appendDeleteRecordTransactional(byte journalID, long txID, long id, EncodingSupport record) throws Exception;
+
+   void appendDeleteRecordTransactional(byte journalID, long txID, long id) throws Exception;
+
+   void appendCommitRecord(byte journalID, long txID, boolean sync) throws Exception;
+
+   void appendPrepareRecord(byte journalID, long txID, EncodingSupport transactionData, boolean sync) throws Exception;
+
+   void appendRollbackRecord(byte journalID, long txID, boolean sync) throws Exception;
+
 }

Modified: branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java	2009-10-01 14:01:21 UTC (rev 8025)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java	2009-10-01 21:02:07 UTC (rev 8026)
@@ -22,6 +22,7 @@
 import org.hornetq.core.remoting.impl.wireformat.NullResponseMessage;
 import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
 import org.hornetq.core.remoting.impl.wireformat.ReplicationAddMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationResponseMessage;
 import org.hornetq.core.replication.ReplicationEndpoint;
 import org.hornetq.core.server.HornetQServer;
 
@@ -79,7 +80,7 @@
          // TODO: what to do when the IO fails on the backup side? should we shutdown the backup?
          log.warn(e.getMessage(), e);
       }
-      channel.send(new NullResponseMessage());
+      channel.send(new ReplicationResponseMessage());
    }
 
    /* (non-Javadoc)

Modified: branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2009-10-01 14:01:21 UTC (rev 8025)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2009-10-01 21:02:07 UTC (rev 8026)
@@ -90,10 +90,97 @@
 
    public void appendAddRecord(final byte journalID, final long id, final byte recordType, final EncodingSupport encodingData)
    {
-      sendReplicatePacket(new ReplicationAddMessage(journalID, id, recordType, encodingData));
+      sendReplicatePacket(new ReplicationAddMessage(journalID, false, id, recordType, encodingData));
    }
+   
+   
+   /* (non-Javadoc)
+    * @see org.hornetq.core.replication.ReplicationManager#appendUpdateRecord(byte, long, byte, org.hornetq.core.journal.EncodingSupport, boolean)
+    */
+   public void appendUpdateRecord(byte journalID, long id, byte recordType, EncodingSupport encodingData, boolean sync) throws Exception
+   {
+      sendReplicatePacket(new ReplicationAddMessage(journalID, true, id, recordType, encodingData));
+   }
 
    /* (non-Javadoc)
+    * @see org.hornetq.core.replication.ReplicationManager#appendDeleteRecord(byte, long, boolean)
+    */
+   public void appendDeleteRecord(byte journalID, long id, boolean sync) throws Exception
+   {
+      // TODO Auto-generated method stub
+      
+   }
+
+
+   
+   public void appendAddRecordTransactional(byte journalID, long txID, long id, byte recordType, EncodingSupport record) throws Exception
+   {
+      
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.replication.ReplicationManager#appendCommitRecord(byte, long, boolean)
+    */
+   public void appendCommitRecord(byte journalID, long txID, boolean sync) throws Exception
+   {
+      // TODO Auto-generated method stub
+      
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.replication.ReplicationManager#appendDeleteRecordTransactional(byte, long, long, org.hornetq.core.journal.EncodingSupport)
+    */
+   public void appendDeleteRecordTransactional(byte journalID, long txID, long id, EncodingSupport record) throws Exception
+   {
+      // TODO Auto-generated method stub
+      
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.replication.ReplicationManager#appendDeleteRecordTransactional(byte, long, long)
+    */
+   public void appendDeleteRecordTransactional(byte journalID, long txID, long id) throws Exception
+   {
+      // TODO Auto-generated method stub
+      
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.replication.ReplicationManager#appendPrepareRecord(byte, long, org.hornetq.core.journal.EncodingSupport, boolean)
+    */
+   public void appendPrepareRecord(byte journalID, long txID, EncodingSupport transactionData, boolean sync) throws Exception
+   {
+      // TODO Auto-generated method stub
+      
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.replication.ReplicationManager#appendRollbackRecord(byte, long, boolean)
+    */
+   public void appendRollbackRecord(byte journalID, long txID, boolean sync) throws Exception
+   {
+      // TODO Auto-generated method stub
+      
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.replication.ReplicationManager#appendUpdateRecordTransactional(byte, long, long, byte, org.hornetq.core.journal.EncodingSupport)
+    */
+   public void appendUpdateRecordTransactional(byte journalID,
+                                               long txID,
+                                               long id,
+                                               byte recordType,
+                                               EncodingSupport record) throws Exception
+   {
+      // TODO Auto-generated method stub
+      
+   }
+
+   
+   
+   
+
+   /* (non-Javadoc)
     * @see org.hornetq.core.server.HornetQComponent#isStarted()
     */
    public synchronized boolean isStarted()
@@ -217,7 +304,7 @@
       public void handlePacket(Packet packet)
       {
          System.out.println("HandlePacket on client");
-         if (packet.getType() == PacketImpl.NULL_RESPONSE)
+         if (packet.getType() == PacketImpl.REPLICATION_RESPONSE)
          {
             replicated();
          }

Modified: branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationTokenImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationTokenImpl.java	2009-10-01 14:01:21 UTC (rev 8025)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationTokenImpl.java	2009-10-01 21:02:07 UTC (rev 8026)
@@ -52,7 +52,7 @@
    /** To be called by the replication manager, when data is confirmed on the channel */
    public synchronized void replicated()
    {
-      if (pendings-- == 0)
+      if (--pendings == 0)
       {
          if (tasks != null)
          {
@@ -63,13 +63,12 @@
             tasks.clear();
          }
       }
-      System.out.println("pendings (replicated) = " + pendings);
    }
    
    /** You may have several actions to be done after a replication operation is completed. */
    public synchronized void addFutureCompletion(Runnable runnable)
    {
-      System.out.println("pendings = " + pendings);
+      System.out.println("pendings on addFutureCompletion = " + pendings);
       if (pendings == 0)
       {
          executor.execute(runnable);

Modified: branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2009-10-01 14:01:21 UTC (rev 8025)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2009-10-01 21:02:07 UTC (rev 8026)
@@ -603,10 +603,10 @@
       if (replicationEndpoint == null)
       {
          replicationEndpoint = new ReplicationEndpointImpl(this);
+         replicationEndpoint.setChannel(channel);
          replicationEndpoint.start();
       }
       
-      replicationEndpoint.setChannel(channel);
       
       return replicationEndpoint;
    }

Modified: branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java	2009-10-01 14:01:21 UTC (rev 8025)
+++ branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java	2009-10-01 21:02:07 UTC (rev 8026)
@@ -146,7 +146,10 @@
       {
          ReplicationManagerImpl manager = new ReplicationManagerImpl(connectionManager, executor);
          manager.start();
-         manager.appendAddRecord((byte)0, 1, (byte)1, new DataImplement());
+         for (int i = 0; i < 100; i++)
+         {
+            manager.appendAddRecord((byte)0, i, (byte)1, new DataImplement());
+         }
          final CountDownLatch latch = new CountDownLatch(1);
          manager.getReplicationToken().addFutureCompletion(new Runnable()
          {



More information about the hornetq-commits mailing list