[jboss-cvs] JBoss Messaging SVN: r7660 - in branches/Branch_MultiThreaded_Replication: src/main/org/jboss/messaging/core/remoting/impl and 4 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Aug 3 15:11:49 EDT 2009


Author: timfox
Date: 2009-08-03 15:11:48 -0400 (Mon, 03 Aug 2009)
New Revision: 7660

Added:
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/ChannelManager.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ChannelManagerImpl.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/PagingStorePacketHandler.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/QueuePacketHandler.java
Modified:
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/paging/impl/PagingStoreFactoryNIO.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/PacketDecoder.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/MessagingServer.java
   branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
   branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverTest.java
Log:
MT replication

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/paging/impl/PagingStoreFactoryNIO.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/paging/impl/PagingStoreFactoryNIO.java	2009-08-03 18:28:37 UTC (rev 7659)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/paging/impl/PagingStoreFactoryNIO.java	2009-08-03 19:11:48 UTC (rev 7660)
@@ -42,6 +42,8 @@
 import org.jboss.messaging.core.paging.PagingStoreFactory;
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.PostOffice;
+import org.jboss.messaging.core.server.ChannelManager;
+import org.jboss.messaging.core.server.MessagingServer;
 import org.jboss.messaging.core.settings.HierarchicalRepository;
 import org.jboss.messaging.core.settings.impl.AddressSettings;
 import org.jboss.messaging.utils.ExecutorFactory;
@@ -70,6 +72,10 @@
 
    private final Executor globalDepagerExecutor;
 
+   private final ChannelManager channelManager;
+
+   private final MessagingServer server;
+
    private PagingManager pagingManager;
 
    private StorageManager storageManager;
@@ -80,12 +86,19 @@
 
    // Constructors --------------------------------------------------
 
-   public PagingStoreFactoryNIO(final String directory, final ExecutorFactory executorFactory)
+   public PagingStoreFactoryNIO(final String directory,
+                                final ExecutorFactory executorFactory,
+                                final ChannelManager channelManager,
+                                final MessagingServer server)
    {
       this.directory = directory;
 
       this.executorFactory = executorFactory;
 
+      this.channelManager = channelManager;
+
+      this.server = server;
+
       globalDepagerExecutor = executorFactory.getExecutor();
    }
 
@@ -102,7 +115,6 @@
 
    public synchronized PagingStore newStore(final SimpleString destinationName, final AddressSettings settings) throws Exception
    {
-
       return new PagingStoreImpl(pagingManager,
                                  storageManager,
                                  postOffice,
@@ -110,7 +122,9 @@
                                  this,
                                  destinationName,
                                  settings,
-                                 executorFactory.getExecutor());
+                                 executorFactory,
+                                 channelManager,
+                                 server);
    }
 
    /**
@@ -176,7 +190,7 @@
          {
 
             final String guid = file.getName();
-            
+
             final File addressFile = new File(file, ADDRESS_FILE);
 
             if (!addressFile.exists())
@@ -188,7 +202,7 @@
             BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(addressFile)));
 
             String destination;
-            
+
             try
             {
                destination = reader.readLine();
@@ -211,7 +225,9 @@
                                                     this,
                                                     destinationName,
                                                     settings,
-                                                    executorFactory.getExecutor());
+                                                    executorFactory,
+                                                    channelManager,
+                                                    server);
 
             storesReturn.add(store);
          }

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java	2009-08-03 18:28:37 UTC (rev 7659)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java	2009-08-03 19:11:48 UTC (rev 7660)
@@ -30,9 +30,7 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.jboss.messaging.core.journal.SequentialFile;
@@ -47,13 +45,24 @@
 import org.jboss.messaging.core.paging.PagingStoreFactory;
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.PostOffice;
+import org.jboss.messaging.core.remoting.Channel;
+import org.jboss.messaging.core.remoting.Packet;
+import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.impl.ChannelImpl;
+import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
+import org.jboss.messaging.core.server.ChannelManager;
+import org.jboss.messaging.core.server.MessagingServer;
 import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.core.server.impl.PagingStorePacketHandler;
+import org.jboss.messaging.core.server.replication.ReplicableCall;
+import org.jboss.messaging.core.server.replication.Replicator;
 import org.jboss.messaging.core.server.replication.impl.ReplicationAwareMutex;
+import org.jboss.messaging.core.server.replication.impl.ReplicatorImpl;
 import org.jboss.messaging.core.settings.impl.AddressSettings;
 import org.jboss.messaging.core.transaction.Transaction;
 import org.jboss.messaging.core.transaction.TransactionPropertyIndexes;
 import org.jboss.messaging.core.transaction.impl.TransactionImpl;
-import org.jboss.messaging.utils.Future;
+import org.jboss.messaging.utils.ExecutorFactory;
 import org.jboss.messaging.utils.SimpleString;
 
 /**
@@ -61,6 +70,7 @@
  * @see PagingStore
  * 
  * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  *
  */
 public class PagingStoreImpl implements TestSupportPageStore
@@ -110,15 +120,15 @@
 
    private volatile Page currentPage;
 
-   private final ReentrantLock writeLock = new ReentrantLock();
+   private final ReplicationAwareMutex writeLock;
 
    /** 
     * We need to perform checks on currentPage with minimal locking
     * */
    private final ReadWriteLock currentPageLock = new ReentrantReadWriteLock();
-   
-   //private final Lock lock = ReplicationAwareMutex.createLock().writeLock();
 
+   // private final Lock lock = ReplicationAwareMutex.createLock().writeLock();
+
    private volatile boolean running = false;
 
    // Static --------------------------------------------------------
@@ -142,7 +152,9 @@
                           final PagingStoreFactory storeFactory,
                           final SimpleString storeName,
                           final AddressSettings addressSettings,
-                          final Executor executor)
+                          final ExecutorFactory executorFactory,
+                          final ChannelManager channelManager,
+                          final MessagingServer server) throws Exception
    {
       if (pagingManager == null)
       {
@@ -161,15 +173,43 @@
 
       dropMessagesWhenFull = addressSettings.isDropMessagesWhenFull();
 
-      this.executor = executor;
+      this.executor = executorFactory.getExecutor();
 
       this.pagingManager = pagingManager;
 
       this.fileFactory = fileFactory;
 
       this.storeFactory = storeFactory;
+
+      long id = storageManager.generateUniqueID();
+
+      if (server.getConfiguration().isBackup())
+      {
+         Channel channel = new ChannelImpl(id, executorFactory.getExecutor());
+
+         channel.setHandler(new PagingStorePacketHandler(this, channel));
+
+         channelManager.putChannel(channel);
+      }
+
+      RemotingConnection connection = server.getPooledReplicatingConnection();
+
+      if (connection != null)
+      {
+         Channel replicatingChannel = new ChannelImpl(id, connection);
+
+         replicator = new ReplicatorImpl("paging-store-" + storeName, replicatingChannel);
+      }
+      else
+      {
+         replicator = null;
+      }
+      
+      this.writeLock = new ReplicationAwareMutex("paging-store-" + storeName, 0, false);
    }
 
+   private final Replicator replicator;
+
    // Public --------------------------------------------------------
 
    // PagingStore implementation ------------------------------------
@@ -346,7 +386,7 @@
          currentPageLock.readLock().unlock();
       }
 
-      writeLock.lock();
+      writeLock.lock(1);
 
       try
       {
@@ -367,7 +407,7 @@
             ByteBuffer buff = ByteBuffer.wrap(bytes);
 
             buff.putLong(msg.getMessageID());
-                        
+
             msg.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, bytes);
          }
 
@@ -501,7 +541,7 @@
       if (running)
       {
          running = false;
-        
+
          if (currentPage != null)
          {
             currentPage.close();
@@ -512,7 +552,7 @@
 
    public void start() throws Exception
    {
-      writeLock.lock();
+      writeLock.lock(2);
 
       try
       {
@@ -604,7 +644,7 @@
 
       // if the first check failed, we do it again under a global currentPageLock
       // (writeLock) this time
-      writeLock.lock();
+      writeLock.lock(3);
 
       try
       {
@@ -632,6 +672,23 @@
     */
    public boolean readPage() throws Exception
    {
+      if (replicator == null)
+      {
+         return doReadPage();
+      }
+      else
+      {
+         ReplicableCall<Boolean> action = new DepageAction();
+
+         replicator.execute(action, null);
+
+         return action.getResult();         
+      }
+
+   }
+   
+   private boolean doReadPage() throws Exception
+   {
       Page page = depage();
 
       if (page == null)
@@ -654,7 +711,35 @@
       }
 
    }
+   
+   private class DepageAction implements ReplicableCall<Boolean>
+   {
+      public Packet getPacket()
+      {
+         return new PacketImpl(PacketImpl.REPLICATE_DEPAGE);
+      }
 
+      private boolean ok;
+
+      public void run()
+      {
+         try
+         {
+            ok = doReadPage();
+         }
+         catch (Exception e)
+         {
+            log.error("Failed to read page", e);
+         }
+      }
+
+      public Boolean getResult()
+      {
+         return ok;
+      }
+   }
+   
+
    // TestSupportPageStore ------------------------------------------
 
    public void forceAnotherPage() throws Exception
@@ -672,7 +757,7 @@
     * */
    public Page depage() throws Exception
    {
-      writeLock.lock();
+      writeLock.lock(4);
 
       currentPageLock.writeLock().lock(); // Make sure no checks are done on currentPage while we are depaging
 

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/PacketDecoder.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/PacketDecoder.java	2009-08-03 18:28:37 UTC (rev 7659)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/PacketDecoder.java	2009-08-03 19:11:48 UTC (rev 7660)
@@ -37,6 +37,7 @@
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_ACKNOWLEDGE;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_ADD_REMOTE_CONSUMER;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_ADD_REMOTE_QUEUE_BINDING;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_DEPAGE;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_LOCK_SEQUENCES;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_QUEUE_DELIVERY;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_REDISTRIBUTION;
@@ -439,6 +440,11 @@
             packet = new PacketImpl(REPLICATE_QUEUE_DELIVERY);
             break;
          } 
+         case REPLICATE_DEPAGE:
+         {
+            packet = new PacketImpl(REPLICATE_DEPAGE);
+            break;
+         } 
          case BACKUP_CONNECTION:
          {
             packet = new PacketImpl(BACKUP_CONNECTION);

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java	2009-08-03 18:28:37 UTC (rev 7659)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java	2009-08-03 19:11:48 UTC (rev 7660)
@@ -162,9 +162,11 @@
    
    public static final byte REPLICATE_QUEUE_DELIVERY = 99;
    
-   public static final byte BACKUP_CONNECTION = 100;
+   public static final byte REPLICATE_DEPAGE = 100;
    
+   public static final byte BACKUP_CONNECTION = 101;
    
+   
    // Static --------------------------------------------------------
 
    public PacketImpl(final byte type)

Added: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/ChannelManager.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/ChannelManager.java	                        (rev 0)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/ChannelManager.java	2009-08-03 19:11:48 UTC (rev 7660)
@@ -0,0 +1,42 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.server;
+
+import org.jboss.messaging.core.remoting.Channel;
+
+/**
+ * A ChannelManager
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ *
+ */
+public interface ChannelManager
+{
+   Channel getChannel(long id);
+   
+   void putChannel(Channel channel);
+   
+   Channel removeChannel(long id);
+}

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/MessagingServer.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/MessagingServer.java	2009-08-03 18:28:37 UTC (rev 7659)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/MessagingServer.java	2009-08-03 19:11:48 UTC (rev 7660)
@@ -116,6 +116,8 @@
    RemotingConnection getNonPooledReplicatingConnection() throws Exception;
 
    void returnNonPooledReplicatingConnection(RemotingConnection connection);
+   
+   RemotingConnection getPooledReplicatingConnection() throws Exception;
 
    void initialiseBackup(UUID nodeID, long currentMessageID) throws Exception;
 

Added: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ChannelManagerImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ChannelManagerImpl.java	                        (rev 0)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ChannelManagerImpl.java	2009-08-03 19:11:48 UTC (rev 7660)
@@ -0,0 +1,61 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.server.impl;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.jboss.messaging.core.remoting.Channel;
+import org.jboss.messaging.core.server.ChannelManager;
+
+/**
+ * A ChannelManagerImpl
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ *
+ */
+public class ChannelManagerImpl implements ChannelManager
+{
+   private ConcurrentMap<Long, Channel> channels = new ConcurrentHashMap<Long, Channel>();
+
+   public Channel getChannel(final long id)
+   {
+      return channels.get(id);
+   }
+
+   public void putChannel(final Channel channel)
+   {
+      if (channels.put(channel.getID(), channel) != null)
+      {
+         throw new IllegalStateException("Already a channel with id " + channel.getID());
+      }
+   }
+
+   public Channel removeChannel(final long id)
+   {
+      return channels.remove(id);
+   }
+
+}

Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2009-08-03 18:28:37 UTC (rev 7659)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2009-08-03 19:11:48 UTC (rev 7660)
@@ -1045,7 +1045,10 @@
 
    protected PagingManager createPagingManager()
    {
-      return new PagingManagerImpl(new PagingStoreFactoryNIO(configuration.getPagingDirectory(), executorFactory),
+      return new PagingManagerImpl(new PagingStoreFactoryNIO(configuration.getPagingDirectory(),
+                                                             executorFactory,
+                                                             remotingService.getChannelManager(),
+                                                             this),
                                    storageManager,
                                    addressSettingsRepository,
                                    configuration.getPagingMaxGlobalSizeBytes(),

Added: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/PagingStorePacketHandler.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/PagingStorePacketHandler.java	                        (rev 0)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/PagingStorePacketHandler.java	2009-08-03 19:11:48 UTC (rev 7660)
@@ -0,0 +1,116 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.server.impl;
+
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_DEPAGE;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_LOCK_SEQUENCES;
+
+import java.util.List;
+
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.paging.PagingStore;
+import org.jboss.messaging.core.remoting.Channel;
+import org.jboss.messaging.core.remoting.ChannelHandler;
+import org.jboss.messaging.core.remoting.Packet;
+import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateLockSequenceMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicationResponseMessage;
+import org.jboss.messaging.core.server.replication.impl.JBMThread;
+import org.jboss.messaging.utils.Triple;
+
+/**
+ * A PagingStorePacketHandler
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ *
+ */
+public class PagingStorePacketHandler implements ChannelHandler
+{
+   private static final Logger log = Logger.getLogger(PagingStorePacketHandler.class);
+   
+   private final PagingStore pagingStore;
+
+   private volatile List<Triple<Long, Long, Integer>> sequences;
+   
+   private final Channel channel;
+
+   public PagingStorePacketHandler(final PagingStore pagingStore, final Channel channel)
+   {
+      this.pagingStore = pagingStore;
+      
+      this.channel = channel;
+   }
+
+   public void handlePacket(final Packet packet)
+   {
+      switch (packet.getType())
+      {
+         case REPLICATE_LOCK_SEQUENCES:
+         {
+            ReplicateLockSequenceMessage msg = (ReplicateLockSequenceMessage)packet;
+            
+            sequences = msg.getSequences();
+            
+           // log.info("got sequences on queue " + msg.getID());
+           // dumpSequences(sequences);
+ 
+            break;
+         }
+         case REPLICATE_DEPAGE:
+         {           
+            JBMThread thread = JBMThread.currentThread();
+
+            thread.setReplay(sequences);
+                     
+            try
+            {
+               pagingStore.readPage();
+            }
+            catch (Exception e)
+            {
+               log.error("Failed to read page", e);
+            }
+            
+            channel.send(new ReplicationResponseMessage());
+            
+            thread.setNoReplayOrRecord(12);
+
+            break;
+         }
+         default:
+         {
+            throw new IllegalArgumentException("Invalid packet " + packet);
+         }
+      }
+   }
+   
+   private void dumpSequences(List<Triple<Long, Long, Integer>> sequences)
+   {
+      log.info(Thread.currentThread() + " Got on paging store replication ph Sequences size is " + sequences.size());
+      
+      for (Triple<Long, Long, Integer> sequence: sequences)
+      {
+         log.info(sequence.a + ": " + sequence.b);
+      }
+   }
+}

Added: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/QueuePacketHandler.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/QueuePacketHandler.java	                        (rev 0)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/QueuePacketHandler.java	2009-08-03 19:11:48 UTC (rev 7660)
@@ -0,0 +1,111 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.server.impl;
+
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_LOCK_SEQUENCES;
+
+import java.util.List;
+
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.Channel;
+import org.jboss.messaging.core.remoting.ChannelHandler;
+import org.jboss.messaging.core.remoting.Packet;
+import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
+import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateLockSequenceMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicationResponseMessage;
+import org.jboss.messaging.core.server.Queue;
+import org.jboss.messaging.core.server.replication.impl.JBMThread;
+import org.jboss.messaging.utils.Triple;
+
+/**
+ * A QueuePacketHandler
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ *
+ */
+public class QueuePacketHandler implements ChannelHandler
+{
+   private static final Logger log = Logger.getLogger(QueuePacketHandler.class);
+
+   private final Queue queue;
+
+   private volatile List<Triple<Long, Long, Integer>> sequences;
+
+   private final Channel channel;
+
+   public QueuePacketHandler(final Queue queue, final Channel channel)
+   {
+      this.queue = queue;
+
+      this.channel = channel;
+   }
+
+   public void handlePacket(final Packet packet)
+   {
+      switch (packet.getType())
+      {
+         case REPLICATE_LOCK_SEQUENCES:
+         {
+            ReplicateLockSequenceMessage msg = (ReplicateLockSequenceMessage)packet;
+
+            sequences = msg.getSequences();
+
+            // log.info("got sequences on queue " + msg.getID());
+            // dumpSequences(sequences);
+
+            break;
+         }
+         case PacketImpl.REPLICATE_QUEUE_DELIVERY:
+         {
+            // log.info("Got replicated queue delivery");
+
+            JBMThread thread = JBMThread.currentThread();
+
+            thread.setReplay(sequences);
+
+            queue.deliverOne();
+
+            channel.send(new ReplicationResponseMessage());
+
+            thread.setNoReplayOrRecord(12);
+
+            break;
+         }
+         default:
+         {
+            throw new IllegalArgumentException("Invalid packet " + packet);
+         }
+      }
+   }
+
+   private void dumpSequences(List<Triple<Long, Long, Integer>> sequences)
+   {
+      log.info(Thread.currentThread() + " Got on queue replication ph Sequences size is " + sequences.size());
+
+      for (Triple<Long, Long, Integer> sequence : sequences)
+      {
+         log.info(sequence.a + ": " + sequence.b);
+      }
+   }
+}

Modified: branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverTest.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverTest.java	2009-08-03 18:28:37 UTC (rev 7659)
+++ branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverTest.java	2009-08-03 19:11:48 UTC (rev 7660)
@@ -233,7 +233,7 @@
    
    protected int getNumberOfMessages()
    {
-      return 500;
+      return 1000;
    }
    
    protected int getNumberOfThreads()




More information about the jboss-cvs-commits mailing list