[hornetq-commits] JBoss hornetq SVN: r8280 - in trunk: src/main/org/hornetq/core/persistence/impl/journal and 8 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Nov 13 13:53:48 EST 2009


Author: jmesnil
Date: 2009-11-13 13:53:47 -0500 (Fri, 13 Nov 2009)
New Revision: 8280

Added:
   trunk/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationSyncContextMessage.java
   trunk/tests/src/org/hornetq/tests/integration/client/DropForceConsumerDeliveryInterceptor.java
   trunk/tests/src/org/hornetq/tests/integration/client/ReceiveImmediate2Test.java
Modified:
   trunk/src/main/org/hornetq/core/persistence/StorageManager.java
   trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
   trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
   trunk/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java
   trunk/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java
   trunk/src/main/org/hornetq/core/replication/ReplicationManager.java
   trunk/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
   trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
   trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
Log:
HORNETQ-218: Incorrect order when persistent and non-persistent messages are sent over replication

* in PostOffice.processRoute(), synchronize the storage manager if it is replicated when the message is non-persistent.if it is replicated to ensure order delivery between 

Modified: trunk/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/StorageManager.java	2009-11-13 10:35:30 UTC (rev 8279)
+++ trunk/src/main/org/hornetq/core/persistence/StorageManager.java	2009-11-13 18:53:47 UTC (rev 8280)
@@ -147,4 +147,6 @@
 
 
    void deleteGrouping(GroupBinding groupBinding) throws Exception;
+
+   void sync();
 }

Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2009-11-13 10:35:30 UTC (rev 8279)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2009-11-13 18:53:47 UTC (rev 8280)
@@ -498,6 +498,14 @@
       messageJournal.appendDeleteRecord(recordID, syncNonTransactional);
    }
 
+   public void sync()
+   {
+      if (replicator != null)
+      {
+         replicator.sync();
+      }
+   }
+
    // Transactional operations
 
    public void storeMessageTransactional(final long txID, final ServerMessage message) throws Exception
@@ -1901,5 +1909,7 @@
       }
 
    }
+   
 
+
 }

Modified: trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java	2009-11-13 10:35:30 UTC (rev 8279)
+++ trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java	2009-11-13 18:53:47 UTC (rev 8280)
@@ -66,6 +66,11 @@
    {
       this.id = id;
    }
+   
+   public void sync()
+   {
+      // NO OP
+   }
 
    public void addQueueBinding(final Binding queueBinding) throws Exception
    {

Modified: trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2009-11-13 10:35:30 UTC (rev 8279)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2009-11-13 18:53:47 UTC (rev 8280)
@@ -780,6 +780,7 @@
          // First send a reset message
 
          ServerMessage message = new ServerMessageImpl(storageManager.generateUniqueID());
+//         message.setDurable(true);
          message.setBody(ChannelBuffers.EMPTY_BUFFER);
          message.setDestination(queueName);
          message.putBooleanProperty(HDR_RESET_QUEUE_DATA, true);
@@ -913,6 +914,13 @@
                }
             }
          }
+         else
+         {
+            if (storageManager.isReplicated())
+            {
+               storageManager.sync();
+            }
+         }
 
          message.incrementRefCount(reference);
       }
@@ -969,6 +977,7 @@
       message.setBody(ChannelBuffers.EMPTY_BUFFER);
 
       message.setDestination(queueName);
+//      message.setDurable(true);
 
       String uid = UUIDGenerator.getInstance().generateStringUUID();
 

Modified: trunk/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java	2009-11-13 10:35:30 UTC (rev 8279)
+++ trunk/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java	2009-11-13 18:53:47 UTC (rev 8280)
@@ -29,6 +29,7 @@
 import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_APPEND_TX;
 import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_COMMIT_ROLLBACK;
 import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_COMPARE_DATA;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_SYNC;
 import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_DELETE;
 import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_DELETE_TX;
 import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_LARGE_MESSAGE_BEGIN;
@@ -93,6 +94,7 @@
 import org.hornetq.core.remoting.impl.wireformat.ReplicationAddTXMessage;
 import org.hornetq.core.remoting.impl.wireformat.ReplicationCommitMessage;
 import org.hornetq.core.remoting.impl.wireformat.ReplicationCompareDataMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationSyncContextMessage;
 import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteMessage;
 import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteTXMessage;
 import org.hornetq.core.remoting.impl.wireformat.ReplicationLargeMessageBeingMessage;
@@ -414,6 +416,11 @@
             packet = new ReplicationDeleteMessage();
             break;
          }
+         case REPLICATION_SYNC:
+         {
+            packet = new ReplicationSyncContextMessage();
+            break;
+         }
          case REPLICATION_DELETE_TX:
          {
             packet = new ReplicationDeleteTXMessage();
@@ -468,7 +475,7 @@
          {
             packet = new SessionForceConsumerDelivery();
             break;
-         }        
+         }
          default:
          {
             throw new IllegalArgumentException("Invalid type: " + packetType);

Modified: trunk/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java	2009-11-13 10:35:30 UTC (rev 8279)
+++ trunk/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java	2009-11-13 18:53:47 UTC (rev 8280)
@@ -172,6 +172,8 @@
    public static final byte REPLICATION_LARGE_MESSAGE_WRITE = 91;
    
    public static final byte REPLICATION_COMPARE_DATA = 92;
+   
+   public static final byte REPLICATION_SYNC = 93;
    // Static --------------------------------------------------------
 
    public PacketImpl(final byte type)

Added: trunk/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationSyncContextMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationSyncContextMessage.java	                        (rev 0)
+++ trunk/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationSyncContextMessage.java	2009-11-13 18:53:47 UTC (rev 8280)
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.remoting.impl.wireformat;
+
+import org.hornetq.core.remoting.spi.HornetQBuffer;
+
+/**
+ * Message sent when a Replication Context is complete without any persistence replicated.
+ * On that case we need to go over the cluster to make sure we get the data sent at the right order.
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class ReplicationSyncContextMessage extends PacketImpl
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public ReplicationSyncContextMessage()
+   {
+      super(REPLICATION_SYNC);
+   }
+
+   // Public --------------------------------------------------------
+
+   @Override
+   public int getRequiredBufferSize()
+   {
+      return BASIC_PACKET_SIZE;
+
+   }
+
+   @Override
+   public void encodeBody(final HornetQBuffer buffer)
+   {
+   }
+
+   @Override
+   public void decodeBody(final HornetQBuffer buffer)
+   {
+   }
+
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Modified: trunk/src/main/org/hornetq/core/replication/ReplicationManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/replication/ReplicationManager.java	2009-11-13 10:35:30 UTC (rev 8279)
+++ trunk/src/main/org/hornetq/core/replication/ReplicationManager.java	2009-11-13 18:53:47 UTC (rev 8280)
@@ -88,4 +88,6 @@
     */
    void compareJournals(JournalLoadInformation[] journalInfo) throws HornetQException;
 
+   void sync();
+
 }

Modified: trunk/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java	2009-11-13 10:35:30 UTC (rev 8279)
+++ trunk/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java	2009-11-13 18:53:47 UTC (rev 8280)
@@ -17,6 +17,7 @@
 import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_LARGE_MESSAGE_END;
 import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_LARGE_MESSAGE_WRITE;
 import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_COMPARE_DATA;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_SYNC;
 
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -162,6 +163,11 @@
             handleCompareDataMessage((ReplicationCompareDataMessage)packet);
             response = new NullResponseMessage();
          }
+         else if (packet.getType() == REPLICATION_SYNC)
+         {
+            // https://jira.jboss.org/jira/browse/HORNETQ-218
+            // Nothing to be done, we just needed a round trip to process events in order
+         }
          else
          {
             log.warn("Packet " + packet + " can't be processed by the ReplicationEndpoint");

Modified: trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2009-11-13 10:35:30 UTC (rev 8279)
+++ trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2009-11-13 18:53:47 UTC (rev 8280)
@@ -34,6 +34,7 @@
 import org.hornetq.core.remoting.impl.wireformat.ReplicationAddTXMessage;
 import org.hornetq.core.remoting.impl.wireformat.ReplicationCommitMessage;
 import org.hornetq.core.remoting.impl.wireformat.ReplicationCompareDataMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationSyncContextMessage;
 import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteMessage;
 import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteTXMessage;
 import org.hornetq.core.remoting.impl.wireformat.ReplicationLargeMessageBeingMessage;
@@ -277,6 +278,14 @@
          sendReplicatePacket(new ReplicationLargemessageEndMessage(messageId));
       }
    }
+   
+   public void sync()
+   {
+      if (enabled)
+      {
+         sendReplicatePacket(new ReplicationSyncContextMessage());
+      }
+   }
 
    /* (non-Javadoc)
     * @see org.hornetq.core.replication.ReplicationManager#largeMessageWrite(long, byte[])

Added: trunk/tests/src/org/hornetq/tests/integration/client/DropForceConsumerDeliveryInterceptor.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/DropForceConsumerDeliveryInterceptor.java	                        (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/client/DropForceConsumerDeliveryInterceptor.java	2009-11-13 18:53:47 UTC (rev 8280)
@@ -0,0 +1,21 @@
+package org.hornetq.tests.integration.client;
+
+import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.remoting.Interceptor;
+import org.hornetq.core.remoting.Packet;
+import org.hornetq.core.remoting.RemotingConnection;
+import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
+
+public class DropForceConsumerDeliveryInterceptor implements Interceptor
+{
+   public boolean intercept(Packet packet, RemotingConnection connection) throws HornetQException
+   {
+      if (packet.getType() == PacketImpl.SESS_FORCE_CONSUMER_DELIVERY)
+      {
+         return false;
+      } else
+      {
+         return true;
+      }
+   }
+}
\ No newline at end of file

Added: trunk/tests/src/org/hornetq/tests/integration/client/ReceiveImmediate2Test.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/ReceiveImmediate2Test.java	                        (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/client/ReceiveImmediate2Test.java	2009-11-13 18:53:47 UTC (rev 8280)
@@ -0,0 +1,118 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.integration.client;
+
+import org.hornetq.core.client.ClientConsumer;
+import org.hornetq.core.client.ClientMessage;
+import org.hornetq.core.client.ClientProducer;
+import org.hornetq.core.client.ClientSession;
+import org.hornetq.core.client.ClientSessionFactory;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
+import org.hornetq.core.client.impl.ClientSessionInternal;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.TransportConfiguration;
+import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.server.HornetQ;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.Queue;
+import org.hornetq.tests.util.ServiceTestBase;
+import org.hornetq.utils.SimpleString;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ */
+public class ReceiveImmediate2Test extends ServiceTestBase
+{
+   private static final Logger log = Logger.getLogger(ReceiveImmediate2Test.class);
+
+   private HornetQServer server;
+
+   private final SimpleString QUEUE = new SimpleString("ReceiveImmediateTest.queue");
+   
+   private final SimpleString ADDRESS = new SimpleString("ReceiveImmediateTest.address");
+   
+   @Override
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+
+      Configuration config = createDefaultConfig(false);
+      config.getInterceptorClassNames().add(DropForceConsumerDeliveryInterceptor.class.getName());
+      server = HornetQ.newHornetQServer(config, false);
+
+      server.start();
+   }
+
+   @Override
+   protected void tearDown() throws Exception
+   {
+      server.stop();
+
+      server = null;
+
+      super.tearDown();
+   }
+
+   private ClientSessionFactory sf;
+
+   public void testConsumerReceiveImmediateDoesNotHang() throws Exception
+   {
+      sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));
+      sf.setBlockOnNonPersistentSend(true);
+      sf.setBlockOnAcknowledge(true);
+      sf.setAckBatchSize(0);
+      sf.setReconnectAttempts(1);
+      sf.setFailoverOnServerShutdown(true);
+      
+      final ClientSession session = sf.createSession(false, true, false);
+
+      session.createQueue(ADDRESS, QUEUE, null, false);
+
+      final ClientConsumer consumer = session.createConsumer(QUEUE, null, false);
+      session.start();
+      
+      Runnable r = new Runnable()
+      {
+         
+         public void run()
+         {
+            try
+            {
+               Thread.sleep(2000);
+               //((ClientSessionInternal)session).getConnection().fail(new HornetQException(HornetQException.NOT_CONNECTED));
+               //session.close();
+               consumer.close();
+            }
+            catch (Exception e)
+            {
+               e.printStackTrace();
+            }
+         }
+      };
+
+     long start = System.currentTimeMillis();
+
+      new Thread(r).start();
+      
+      ClientMessage message = consumer.receiveImmediate();
+      assertNull(message);
+
+      long end = System.currentTimeMillis();
+      assertTrue(end - start >= 2000);
+      session.close();
+
+      sf.close();
+   }
+}

Modified: trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java	2009-11-13 10:35:30 UTC (rev 8279)
+++ trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java	2009-11-13 18:53:47 UTC (rev 8280)
@@ -978,6 +978,10 @@
       {
          // To change body of implemented methods use File | Settings | File Templates.
       }
+      
+      public void sync()
+      {
+      }
 
       /* (non-Javadoc)
        * @see org.hornetq.core.persistence.StorageManager#loadMessageJournal(org.hornetq.core.paging.PagingManager, java.util.Map, org.hornetq.core.transaction.ResourceManager, java.util.Map)



More information about the hornetq-commits mailing list