[hornetq-commits] JBoss hornetq SVN: r11181 - in branches/HORNETQ-720_Replication: hornetq-core/src/main/java/org/hornetq/core/server/impl and 3 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Aug 10 11:05:24 EDT 2011


Author: borges
Date: 2011-08-10 11:05:24 -0400 (Wed, 10 Aug 2011)
New Revision: 11181

Added:
   branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedWithDelayFailoverTest.java
   branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java
Modified:
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
   branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/FileWrapperJournal.java
   branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java
   branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java
   branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
Log:
HORNETQ-720 Run "FailoverTest" with delayed sync & implement more FileWrapperJournal calls.

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-08-10 13:54:41 UTC (rev 11180)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-08-10 15:05:24 UTC (rev 11181)
@@ -351,7 +351,7 @@
     * @param replicationManager
     * @throws HornetQException
     */
-   public void setReplicator(ReplicationManager replicationManager) throws Exception
+   public void startReplication(ReplicationManager replicationManager) throws Exception
    {
       if (!started)
       {
@@ -375,7 +375,6 @@
 
       try
       {
-         // XXX HORNETQ-720 WRITE LOCK the StorageManager.
          storageManagerLock.writeLock().lock();
          try
          {
@@ -398,7 +397,6 @@
          }
          finally
          {
-            // XXX HORNETQ-720 UNLOCK StorageManager...
             storageManagerLock.writeLock().unlock();
          }
          sendJournalFile(messageFiles, JournalContent.MESSAGES);

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-08-10 13:54:41 UTC (rev 11180)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-08-10 15:05:24 UTC (rev 11181)
@@ -2029,7 +2029,7 @@
       replicationManager = new ReplicationManagerImpl(rc, executorFactory);
       replicationManager.start();
 
-      journalStorageManager.setReplicator(replicationManager);
+      journalStorageManager.startReplication(replicationManager);
    }
 
    /**

Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/FileWrapperJournal.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/FileWrapperJournal.java	2011-08-10 13:54:41 UTC (rev 11180)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/FileWrapperJournal.java	2011-08-10 15:05:24 UTC (rev 11181)
@@ -13,6 +13,8 @@
 import org.hornetq.core.journal.RecordInfo;
 import org.hornetq.core.journal.TransactionFailureCallback;
 import org.hornetq.core.journal.impl.dataformat.JournalAddRecord;
+import org.hornetq.core.journal.impl.dataformat.JournalAddRecordTX;
+import org.hornetq.core.journal.impl.dataformat.JournalCompleteRecordTX;
 import org.hornetq.core.journal.impl.dataformat.JournalDeleteRecord;
 import org.hornetq.core.journal.impl.dataformat.JournalInternalRecord;
 
@@ -128,7 +130,8 @@
    public void appendAddRecordTransactional(long txID, long id, byte recordType, EncodingSupport record)
             throws Exception
    {
-      throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
+      JournalInternalRecord addRecord = new JournalAddRecordTX(true, txID, id, recordType, record);
+      writeRecord(addRecord, false, null);
    }
 
    @Override
@@ -144,21 +147,24 @@
    public void appendUpdateRecordTransactional(long txID, long id, byte recordType, EncodingSupport record)
             throws Exception
    {
-      throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
+      JournalInternalRecord updateRecordTX = new JournalAddRecordTX(false, txID, id, recordType, record);
+      writeRecord(updateRecordTX, false, null);
    }
 
    @Override
    public void appendCommitRecord(long txID, boolean sync, IOCompletion callback, boolean lineUpContext)
             throws Exception
    {
-      throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
+      JournalInternalRecord commitRecord = new JournalCompleteRecordTX(true, txID, null);
+      writeRecord(commitRecord, sync, callback);
    }
 
    @Override
    public void appendPrepareRecord(long txID, EncodingSupport transactionData, boolean sync, IOCompletion callback)
             throws Exception
    {
-      throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
+      JournalInternalRecord prepareRecord = new JournalCompleteRecordTX(false, txID, transactionData);
+      writeRecord(prepareRecord, sync, callback);
    }
 
    @Override

Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java	2011-08-10 13:54:41 UTC (rev 11180)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java	2011-08-10 15:05:24 UTC (rev 11181)
@@ -1027,7 +1027,6 @@
 
    private void setJournalState(JournalState newState)
    {
-      // log.info(this + " state=" + newState);
       state = newState;
    }
 
@@ -1750,7 +1749,6 @@
    private synchronized JournalLoadInformation load(final LoaderCallback loadManager, boolean fixFailingTransactions,
             final boolean replicationSync) throws Exception
    {
-      System.out.println("LOAD! " + state + " " + replicationSync);
       if (state == JournalState.STOPPED || state == JournalState.LOADED)
       {
          throw new IllegalStateException("Journal " + this + " must be in " + JournalState.STARTED + " state, was " +

Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java	2011-08-10 13:54:41 UTC (rev 11180)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java	2011-08-10 15:05:24 UTC (rev 11181)
@@ -2,10 +2,8 @@
 
 import java.util.HashSet;
 import java.util.Set;
-import java.util.concurrent.locks.Lock;
 
 import org.hornetq.api.core.HornetQException;
-import org.hornetq.api.core.Interceptor;
 import org.hornetq.api.core.TransportConfiguration;
 import org.hornetq.api.core.client.ClientConsumer;
 import org.hornetq.api.core.client.ClientProducer;
@@ -15,16 +13,7 @@
 import org.hornetq.core.journal.impl.JournalFile;
 import org.hornetq.core.journal.impl.JournalImpl;
 import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
-import org.hornetq.core.protocol.core.Channel;
-import org.hornetq.core.protocol.core.ChannelHandler;
-import org.hornetq.core.protocol.core.CommandConfirmationHandler;
-import org.hornetq.core.protocol.core.CoreRemotingConnection;
-import org.hornetq.core.protocol.core.Packet;
-import org.hornetq.core.protocol.core.impl.PacketImpl;
-import org.hornetq.core.protocol.core.impl.wireformat.ReplicationJournalFileMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.ReplicationResponseMessage;
-import org.hornetq.core.replication.ReplicationEndpoint;
-import org.hornetq.spi.core.protocol.RemotingConnection;
+import org.hornetq.tests.integration.cluster.util.BackupSyncDelay;
 import org.hornetq.tests.integration.cluster.util.TestableServer;
 import org.hornetq.tests.util.TransportConfigurationUtils;
 
@@ -35,7 +24,7 @@
    private ClientSessionFactoryInternal sessionFactory;
    private ClientSession session;
    private ClientProducer producer;
-   private ReplicationChannelHandler handler;
+   private BackupSyncDelay syncDelay;
    private static final int N_MSGS = 100;
 
    @Override
@@ -48,8 +37,7 @@
       locator.setBlockOnDurableSend(true);
       locator.setReconnectAttempts(-1);
       sessionFactory = createSessionFactoryAndWaitForTopology(locator, 1);
-      handler = new ReplicationChannelHandler();
-      liveServer.addInterceptor(new BackupSyncDelay(handler));
+      syncDelay = new BackupSyncDelay(backupServer, liveServer);
    }
 
    public void testNodeID() throws Exception
@@ -100,9 +88,7 @@
 
    private void finishSyncAndFailover() throws Exception
    {
-      handler.deliver = true;
-      // must send one more message to have the "SYNC is DONE" msg delivered.
-      sendMessages(session, producer, 1);
+      syncDelay.deliverUpToDateMsg();
       waitForBackup(sessionFactory, 10, true);
       assertFalse("should not be initialized", backupServer.getServer().isInitialised());
       crash(session);
@@ -127,7 +113,7 @@
    private void startBackupCrashLive() throws Exception
    {
       assertFalse("backup is started?", backupServer.isStarted());
-      handler.setHold(false);
+      liveServer.removeInterceptor(syncDelay);
       backupServer.start();
       waitForBackup(sessionFactory, 5);
       crash(session);
@@ -218,260 +204,5 @@
       return TransportConfigurationUtils.getInVMConnector(live);
    }
 
-   private class BackupSyncDelay implements Interceptor
-   {
 
-      private final ReplicationChannelHandler handler;
-
-      public BackupSyncDelay(ReplicationChannelHandler handler)
-      {
-         this.handler = handler;
-      }
-
-      @Override
-      public boolean intercept(Packet packet, RemotingConnection connection) throws HornetQException
-      {
-         if (packet.getType() == PacketImpl.HA_BACKUP_REGISTRATION)
-         {
-            try
-            {
-               ReplicationEndpoint repEnd = backupServer.getServer().getReplicationEndpoint();
-               handler.addSubHandler(repEnd);
-               Channel repChannel = repEnd.getChannel();
-               repChannel.setHandler(handler);
-               handler.setChannel(repChannel);
-               liveServer.removeInterceptor(this);
-            }
-            catch (Exception e)
-            {
-               throw new RuntimeException(e);
-            }
-         }
-         return true;
-      }
-
-   }
-
-   private static class ReplicationChannelHandler implements ChannelHandler
-   {
-
-      private ReplicationEndpoint handler;
-      private Packet onHold;
-      private Channel channel;
-      public volatile boolean deliver;
-      private boolean mustHold = true;
-
-      public void addSubHandler(ReplicationEndpoint handler)
-      {
-         this.handler = handler;
-      }
-
-      public void setChannel(Channel channel)
-      {
-         this.channel = channel;
-      }
-
-      public void setHold(boolean hold)
-      {
-         mustHold = hold;
-      }
-
-      @Override
-      public void handlePacket(Packet packet)
-      {
-
-         if (onHold != null && deliver)
-         {
-            // Use wrapper to avoid sending a response
-            ChannelWrapper wrapper = new ChannelWrapper(channel);
-            handler.setChannel(wrapper);
-            try
-            {
-               handler.handlePacket(onHold);
-            }
-            finally
-            {
-               handler.setChannel(channel);
-               onHold = null;
-            }
-         }
-
-         if (packet.getType() == PacketImpl.REPLICATION_SYNC && mustHold)
-         {
-            ReplicationJournalFileMessage syncMsg = (ReplicationJournalFileMessage)packet;
-            if (syncMsg.isUpToDate())
-            {
-               assert onHold == null;
-               onHold = packet;
-               PacketImpl response = new ReplicationResponseMessage();
-               channel.send(response);
-               return;
-            }
-         }
-
-         handler.handlePacket(packet);
-      }
-
-   }
-
-   private static class ChannelWrapper implements Channel
-   {
-
-      private final Channel channel;
-
-      /**
-       * @param connection
-       * @param id
-       * @param confWindowSize
-       */
-      public ChannelWrapper(Channel channel)
-      {
-         this.channel = channel;
-      }
-
-      @Override
-      public String toString()
-      {
-         return "ChannelWrapper(" + channel + ")";
-      }
-
-      @Override
-      public long getID()
-      {
-         return channel.getID();
-      }
-
-      @Override
-      public void send(Packet packet)
-      {
-         // no-op
-         // channel.send(packet);
-      }
-
-      @Override
-      public void sendBatched(Packet packet)
-      {
-         throw new UnsupportedOperationException();
-
-      }
-
-      @Override
-      public void sendAndFlush(Packet packet)
-      {
-         throw new UnsupportedOperationException();
-      }
-
-      @Override
-      public Packet sendBlocking(Packet packet) throws HornetQException
-      {
-         throw new UnsupportedOperationException();
-      }
-
-      @Override
-      public void setHandler(ChannelHandler handler)
-      {
-         throw new UnsupportedOperationException();
-      }
-
-      @Override
-      public void close()
-      {
-         throw new UnsupportedOperationException();
-      }
-
-      @Override
-      public void transferConnection(CoreRemotingConnection newConnection)
-      {
-         throw new UnsupportedOperationException();
-
-      }
-
-      @Override
-      public void replayCommands(int lastConfirmedCommandID)
-      {
-         throw new UnsupportedOperationException();
-      }
-
-      @Override
-      public int getLastConfirmedCommandID()
-      {
-         throw new UnsupportedOperationException();
-      }
-
-      @Override
-      public void lock()
-      {
-         throw new UnsupportedOperationException();
-      }
-
-      @Override
-      public void unlock()
-      {
-         throw new UnsupportedOperationException();
-
-      }
-
-      @Override
-      public void returnBlocking()
-      {
-         throw new UnsupportedOperationException();
-      }
-
-      @Override
-      public Lock getLock()
-      {
-         throw new UnsupportedOperationException();
-      }
-
-      @Override
-      public CoreRemotingConnection getConnection()
-      {
-         throw new UnsupportedOperationException();
-      }
-
-      @Override
-      public void confirm(Packet packet)
-      {
-         throw new UnsupportedOperationException();
-      }
-
-      @Override
-      public void setCommandConfirmationHandler(CommandConfirmationHandler handler)
-      {
-         throw new UnsupportedOperationException();
-
-      }
-
-      @Override
-      public void flushConfirmations()
-      {
-         throw new UnsupportedOperationException();
-      }
-
-      @Override
-      public void handlePacket(Packet packet)
-      {
-         throw new UnsupportedOperationException();
-
-      }
-
-      @Override
-      public void clearCommands()
-      {
-         throw new UnsupportedOperationException();
-      }
-
-      @Override
-      public int getConfirmationWindowSize()
-      {
-         throw new UnsupportedOperationException();
-      }
-
-      @Override
-      public void setTransferring(boolean transferring)
-      {
-         throw new UnsupportedOperationException();
-      }
-
-   }
 }

Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java	2011-08-10 13:54:41 UTC (rev 11180)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java	2011-08-10 15:05:24 UTC (rev 11181)
@@ -137,7 +137,7 @@
    {
       if (backupServer != null)
       {
-         // some tests fail the live before the backup is in sync
+         // some tests crash the liveServer before the backupServer is sync'ed
          waitForBackup(sf, 3);
       }
       super.crash(sessions);
@@ -273,6 +273,7 @@
       for (int i = 0; i < numMessages; i++)
       {
          ClientMessage message = consumer.receive(1000);
+         assertNotNull("Just crashed? " + (i == 6) + " " + i, message);
 
          message.acknowledge();
 
@@ -627,7 +628,7 @@
       {
          ClientMessage message = consumer.receive(1000);
 
-         Assert.assertNotNull(message);
+         Assert.assertNotNull("expecting message " + i, message);
 
          assertMessageBody(i, message);
 
@@ -1165,7 +1166,7 @@
          if (isDurable(i))
          {
             ClientMessage message = consumer.receive(1000);
-            Assert.assertNotNull(message);
+            Assert.assertNotNull("expecting durable msg " + i, message);
             assertMessageBody(i, message);
             Assert.assertEquals(i, message.getIntProperty("counter").intValue());
             message.acknowledge();

Added: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedWithDelayFailoverTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedWithDelayFailoverTest.java	                        (rev 0)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedWithDelayFailoverTest.java	2011-08-10 15:05:24 UTC (rev 11181)
@@ -0,0 +1,26 @@
+package org.hornetq.tests.integration.cluster.failover;
+
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.tests.integration.cluster.util.BackupSyncDelay;
+
+public class ReplicatedWithDelayFailoverTest extends ReplicatedFailoverTest
+{
+
+   private BackupSyncDelay syncDelay;
+
+   @Override
+   protected void setUp() throws Exception
+   {
+      startBackupServer = false;
+      super.setUp();
+      syncDelay = new BackupSyncDelay(backupServer, liveServer);
+      backupServer.start();
+   }
+
+   @Override
+   protected void crash(ClientSession... sessions) throws Exception
+   {
+      syncDelay.deliverUpToDateMsg();
+      super.crash(sessions);
+   }
+}

Added: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java	                        (rev 0)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java	2011-08-10 15:05:24 UTC (rev 11181)
@@ -0,0 +1,309 @@
+/**
+ *
+ */
+package org.hornetq.tests.integration.cluster.util;
+
+import java.util.concurrent.locks.Lock;
+
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.Interceptor;
+import org.hornetq.core.protocol.core.Channel;
+import org.hornetq.core.protocol.core.ChannelHandler;
+import org.hornetq.core.protocol.core.CommandConfirmationHandler;
+import org.hornetq.core.protocol.core.CoreRemotingConnection;
+import org.hornetq.core.protocol.core.Packet;
+import org.hornetq.core.protocol.core.impl.PacketImpl;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationJournalFileMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationResponseMessage;
+import org.hornetq.core.replication.ReplicationEndpoint;
+import org.hornetq.spi.core.protocol.RemotingConnection;
+
+/**
+ * An interceptor to keep a replicated backup server from reaching "up-to-date" status.
+ * <p>
+ * One problem is that we can't add an interceptor to the backup before starting it. So we add the
+ * interceptor to the 'live' which will place a different {@link ChannelHandler} in the backup
+ * during the initialization of replication.
+ * <p>
+ * We need to hijack the replication channel handler, because we need to
+ * <ol>
+ * <li>send an early answer to the {@link PacketImpl#REPLICATION_SYNC} packet that signals being
+ * up-to-date
+ * <li>not send an answer to it, when we deliver the packet later.
+ * </ol>
+ */
+public class BackupSyncDelay implements Interceptor
+{
+
+   private final ReplicationChannelHandler handler = new ReplicationChannelHandler();
+   private final TestableServer backup;
+   private final TestableServer live;
+
+   public void deliverUpToDateMsg()
+   {
+      handler.deliver();
+   }
+
+   public BackupSyncDelay(TestableServer backup, TestableServer live)
+   {
+      assert backup.getServer().getConfiguration().isBackup();
+      assert !live.getServer().getConfiguration().isBackup();
+      this.backup = backup;
+      this.live = live;
+      live.addInterceptor(this);
+   }
+
+   @Override
+   public boolean intercept(Packet packet, RemotingConnection connection) throws HornetQException
+   {
+      if (packet.getType() == PacketImpl.HA_BACKUP_REGISTRATION)
+      {
+         try
+         {
+            ReplicationEndpoint repEnd = backup.getServer().getReplicationEndpoint();
+            handler.addSubHandler(repEnd);
+            Channel repChannel = repEnd.getChannel();
+            repChannel.setHandler(handler);
+            handler.setChannel(repChannel);
+            live.removeInterceptor(this);
+         }
+         catch (Exception e)
+         {
+            throw new RuntimeException(e);
+         }
+      }
+      return true;
+   }
+
+   public static class ReplicationChannelHandler implements ChannelHandler
+   {
+
+      private ReplicationEndpoint handler;
+      private Packet onHold;
+      private Channel channel;
+      public volatile boolean deliver;
+      private boolean mustHold = true;
+
+      public void addSubHandler(ReplicationEndpoint handler)
+      {
+         this.handler = handler;
+      }
+
+      public synchronized void deliver()
+      {
+         if (onHold == null)
+         {
+            throw new NullPointerException("Don't have the 'sync is done' packet to deliver");
+         }
+         // Use wrapper to avoid sending a response
+         ChannelWrapper wrapper = new ChannelWrapper(channel);
+         handler.setChannel(wrapper);
+         try
+         {
+            handler.handlePacket(onHold);
+         }
+         finally
+         {
+            handler.setChannel(channel);
+            onHold = null;
+         }
+      }
+
+      public void setChannel(Channel channel)
+      {
+         this.channel = channel;
+      }
+
+      public void setHold(boolean hold)
+      {
+         mustHold = hold;
+      }
+
+      @Override
+      public synchronized void handlePacket(Packet packet)
+      {
+
+         if (onHold != null && deliver)
+         {
+            deliver();
+         }
+
+         if (packet.getType() == PacketImpl.REPLICATION_SYNC && mustHold)
+         {
+            ReplicationJournalFileMessage syncMsg = (ReplicationJournalFileMessage)packet;
+            if (syncMsg.isUpToDate())
+            {
+               assert onHold == null;
+               onHold = packet;
+               PacketImpl response = new ReplicationResponseMessage();
+               channel.send(response);
+               return;
+            }
+         }
+
+         handler.handlePacket(packet);
+      }
+
+   }
+
+   public static class ChannelWrapper implements Channel
+   {
+
+      private final Channel channel;
+
+      /**
+       * @param connection
+       * @param id
+       * @param confWindowSize
+       */
+      public ChannelWrapper(Channel channel)
+      {
+         this.channel = channel;
+      }
+
+      @Override
+      public String toString()
+      {
+         return "ChannelWrapper(" + channel + ")";
+      }
+
+      @Override
+      public long getID()
+      {
+         return channel.getID();
+      }
+
+      @Override
+      public void send(Packet packet)
+      {
+         // no-op
+         // channel.send(packet);
+      }
+
+      @Override
+      public void sendBatched(Packet packet)
+      {
+         throw new UnsupportedOperationException();
+
+      }
+
+      @Override
+      public void sendAndFlush(Packet packet)
+      {
+         throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public Packet sendBlocking(Packet packet) throws HornetQException
+      {
+         throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public void setHandler(ChannelHandler handler)
+      {
+         throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public void close()
+      {
+         throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public void transferConnection(CoreRemotingConnection newConnection)
+      {
+         throw new UnsupportedOperationException();
+
+      }
+
+      @Override
+      public void replayCommands(int lastConfirmedCommandID)
+      {
+         throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public int getLastConfirmedCommandID()
+      {
+         throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public void lock()
+      {
+         throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public void unlock()
+      {
+         throw new UnsupportedOperationException();
+
+      }
+
+      @Override
+      public void returnBlocking()
+      {
+         throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public Lock getLock()
+      {
+         throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public CoreRemotingConnection getConnection()
+      {
+         throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public void confirm(Packet packet)
+      {
+         throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public void setCommandConfirmationHandler(CommandConfirmationHandler handler)
+      {
+         throw new UnsupportedOperationException();
+
+      }
+
+      @Override
+      public void flushConfirmations()
+      {
+         throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public void handlePacket(Packet packet)
+      {
+         throw new UnsupportedOperationException();
+
+      }
+
+      @Override
+      public void clearCommands()
+      {
+         throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public int getConfirmationWindowSize()
+      {
+         throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public void setTransferring(boolean transferring)
+      {
+         throw new UnsupportedOperationException();
+      }
+
+   }
+}
\ No newline at end of file



More information about the hornetq-commits mailing list