Author: jmesnil
Date: 2009-11-13 13:53:47 -0500 (Fri, 13 Nov 2009)
New Revision: 8280
Added:
trunk/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationSyncContextMessage.java
trunk/tests/src/org/hornetq/tests/integration/client/DropForceConsumerDeliveryInterceptor.java
trunk/tests/src/org/hornetq/tests/integration/client/ReceiveImmediate2Test.java
Modified:
trunk/src/main/org/hornetq/core/persistence/StorageManager.java
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
trunk/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java
trunk/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java
trunk/src/main/org/hornetq/core/replication/ReplicationManager.java
trunk/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
Log:
HORNETQ-218: Incorrect order when persistent and non-persistent messages are sent over
replication
* in PostOffice.processRoute(), synchronize the storage manager if it is replicated when
the message is non-persistent.if it is replicated to ensure order delivery between
Modified: trunk/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/StorageManager.java 2009-11-13 10:35:30
UTC (rev 8279)
+++ trunk/src/main/org/hornetq/core/persistence/StorageManager.java 2009-11-13 18:53:47
UTC (rev 8280)
@@ -147,4 +147,6 @@
void deleteGrouping(GroupBinding groupBinding) throws Exception;
+
+ void sync();
}
Modified:
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
---
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-11-13
10:35:30 UTC (rev 8279)
+++
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-11-13
18:53:47 UTC (rev 8280)
@@ -498,6 +498,14 @@
messageJournal.appendDeleteRecord(recordID, syncNonTransactional);
}
+ public void sync()
+ {
+ if (replicator != null)
+ {
+ replicator.sync();
+ }
+ }
+
// Transactional operations
public void storeMessageTransactional(final long txID, final ServerMessage message)
throws Exception
@@ -1901,5 +1909,7 @@
}
}
+
+
}
Modified: trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
---
trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2009-11-13
10:35:30 UTC (rev 8279)
+++
trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2009-11-13
18:53:47 UTC (rev 8280)
@@ -66,6 +66,11 @@
{
this.id = id;
}
+
+ public void sync()
+ {
+ // NO OP
+ }
public void addQueueBinding(final Binding queueBinding) throws Exception
{
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-11-13
10:35:30 UTC (rev 8279)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-11-13
18:53:47 UTC (rev 8280)
@@ -780,6 +780,7 @@
// First send a reset message
ServerMessage message = new
ServerMessageImpl(storageManager.generateUniqueID());
+// message.setDurable(true);
message.setBody(ChannelBuffers.EMPTY_BUFFER);
message.setDestination(queueName);
message.putBooleanProperty(HDR_RESET_QUEUE_DATA, true);
@@ -913,6 +914,13 @@
}
}
}
+ else
+ {
+ if (storageManager.isReplicated())
+ {
+ storageManager.sync();
+ }
+ }
message.incrementRefCount(reference);
}
@@ -969,6 +977,7 @@
message.setBody(ChannelBuffers.EMPTY_BUFFER);
message.setDestination(queueName);
+// message.setDurable(true);
String uid = UUIDGenerator.getInstance().generateStringUUID();
Modified: trunk/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java 2009-11-13 10:35:30
UTC (rev 8279)
+++ trunk/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java 2009-11-13 18:53:47
UTC (rev 8280)
@@ -29,6 +29,7 @@
import static
org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_APPEND_TX;
import static
org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_COMMIT_ROLLBACK;
import static
org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_COMPARE_DATA;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_SYNC;
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_DELETE;
import static
org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_DELETE_TX;
import static
org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_LARGE_MESSAGE_BEGIN;
@@ -93,6 +94,7 @@
import org.hornetq.core.remoting.impl.wireformat.ReplicationAddTXMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationCommitMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationCompareDataMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationSyncContextMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteTXMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationLargeMessageBeingMessage;
@@ -414,6 +416,11 @@
packet = new ReplicationDeleteMessage();
break;
}
+ case REPLICATION_SYNC:
+ {
+ packet = new ReplicationSyncContextMessage();
+ break;
+ }
case REPLICATION_DELETE_TX:
{
packet = new ReplicationDeleteTXMessage();
@@ -468,7 +475,7 @@
{
packet = new SessionForceConsumerDelivery();
break;
- }
+ }
default:
{
throw new IllegalArgumentException("Invalid type: " + packetType);
Modified: trunk/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java 2009-11-13
10:35:30 UTC (rev 8279)
+++ trunk/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java 2009-11-13
18:53:47 UTC (rev 8280)
@@ -172,6 +172,8 @@
public static final byte REPLICATION_LARGE_MESSAGE_WRITE = 91;
public static final byte REPLICATION_COMPARE_DATA = 92;
+
+ public static final byte REPLICATION_SYNC = 93;
// Static --------------------------------------------------------
public PacketImpl(final byte type)
Added:
trunk/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationSyncContextMessage.java
===================================================================
---
trunk/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationSyncContextMessage.java
(rev 0)
+++
trunk/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationSyncContextMessage.java 2009-11-13
18:53:47 UTC (rev 8280)
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.remoting.impl.wireformat;
+
+import org.hornetq.core.remoting.spi.HornetQBuffer;
+
+/**
+ * Message sent when a Replication Context is complete without any persistence
replicated.
+ * On that case we need to go over the cluster to make sure we get the data sent at the
right order.
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class ReplicationSyncContextMessage extends PacketImpl
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public ReplicationSyncContextMessage()
+ {
+ super(REPLICATION_SYNC);
+ }
+
+ // Public --------------------------------------------------------
+
+ @Override
+ public int getRequiredBufferSize()
+ {
+ return BASIC_PACKET_SIZE;
+
+ }
+
+ @Override
+ public void encodeBody(final HornetQBuffer buffer)
+ {
+ }
+
+ @Override
+ public void decodeBody(final HornetQBuffer buffer)
+ {
+ }
+
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: trunk/src/main/org/hornetq/core/replication/ReplicationManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/replication/ReplicationManager.java 2009-11-13
10:35:30 UTC (rev 8279)
+++ trunk/src/main/org/hornetq/core/replication/ReplicationManager.java 2009-11-13
18:53:47 UTC (rev 8280)
@@ -88,4 +88,6 @@
*/
void compareJournals(JournalLoadInformation[] journalInfo) throws HornetQException;
+ void sync();
+
}
Modified: trunk/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
---
trunk/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2009-11-13
10:35:30 UTC (rev 8279)
+++
trunk/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2009-11-13
18:53:47 UTC (rev 8280)
@@ -17,6 +17,7 @@
import static
org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_LARGE_MESSAGE_END;
import static
org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_LARGE_MESSAGE_WRITE;
import static
org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_COMPARE_DATA;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_SYNC;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -162,6 +163,11 @@
handleCompareDataMessage((ReplicationCompareDataMessage)packet);
response = new NullResponseMessage();
}
+ else if (packet.getType() == REPLICATION_SYNC)
+ {
+ //
https://jira.jboss.org/jira/browse/HORNETQ-218
+ // Nothing to be done, we just needed a round trip to process events in
order
+ }
else
{
log.warn("Packet " + packet + " can't be processed by the
ReplicationEndpoint");
Modified: trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
---
trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-11-13
10:35:30 UTC (rev 8279)
+++
trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-11-13
18:53:47 UTC (rev 8280)
@@ -34,6 +34,7 @@
import org.hornetq.core.remoting.impl.wireformat.ReplicationAddTXMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationCommitMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationCompareDataMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationSyncContextMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteTXMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationLargeMessageBeingMessage;
@@ -277,6 +278,14 @@
sendReplicatePacket(new ReplicationLargemessageEndMessage(messageId));
}
}
+
+ public void sync()
+ {
+ if (enabled)
+ {
+ sendReplicatePacket(new ReplicationSyncContextMessage());
+ }
+ }
/* (non-Javadoc)
* @see org.hornetq.core.replication.ReplicationManager#largeMessageWrite(long,
byte[])
Added:
trunk/tests/src/org/hornetq/tests/integration/client/DropForceConsumerDeliveryInterceptor.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/client/DropForceConsumerDeliveryInterceptor.java
(rev 0)
+++
trunk/tests/src/org/hornetq/tests/integration/client/DropForceConsumerDeliveryInterceptor.java 2009-11-13
18:53:47 UTC (rev 8280)
@@ -0,0 +1,21 @@
+package org.hornetq.tests.integration.client;
+
+import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.remoting.Interceptor;
+import org.hornetq.core.remoting.Packet;
+import org.hornetq.core.remoting.RemotingConnection;
+import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
+
+public class DropForceConsumerDeliveryInterceptor implements Interceptor
+{
+ public boolean intercept(Packet packet, RemotingConnection connection) throws
HornetQException
+ {
+ if (packet.getType() == PacketImpl.SESS_FORCE_CONSUMER_DELIVERY)
+ {
+ return false;
+ } else
+ {
+ return true;
+ }
+ }
+}
\ No newline at end of file
Added: trunk/tests/src/org/hornetq/tests/integration/client/ReceiveImmediate2Test.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/ReceiveImmediate2Test.java
(rev 0)
+++
trunk/tests/src/org/hornetq/tests/integration/client/ReceiveImmediate2Test.java 2009-11-13
18:53:47 UTC (rev 8280)
@@ -0,0 +1,118 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.integration.client;
+
+import org.hornetq.core.client.ClientConsumer;
+import org.hornetq.core.client.ClientMessage;
+import org.hornetq.core.client.ClientProducer;
+import org.hornetq.core.client.ClientSession;
+import org.hornetq.core.client.ClientSessionFactory;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
+import org.hornetq.core.client.impl.ClientSessionInternal;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.TransportConfiguration;
+import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.server.HornetQ;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.Queue;
+import org.hornetq.tests.util.ServiceTestBase;
+import org.hornetq.utils.SimpleString;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert
Suconic</a>
+ */
+public class ReceiveImmediate2Test extends ServiceTestBase
+{
+ private static final Logger log = Logger.getLogger(ReceiveImmediate2Test.class);
+
+ private HornetQServer server;
+
+ private final SimpleString QUEUE = new
SimpleString("ReceiveImmediateTest.queue");
+
+ private final SimpleString ADDRESS = new
SimpleString("ReceiveImmediateTest.address");
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ Configuration config = createDefaultConfig(false);
+
config.getInterceptorClassNames().add(DropForceConsumerDeliveryInterceptor.class.getName());
+ server = HornetQ.newHornetQServer(config, false);
+
+ server.start();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ server.stop();
+
+ server = null;
+
+ super.tearDown();
+ }
+
+ private ClientSessionFactory sf;
+
+ public void testConsumerReceiveImmediateDoesNotHang() throws Exception
+ {
+ sf = new ClientSessionFactoryImpl(new
TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));
+ sf.setBlockOnNonPersistentSend(true);
+ sf.setBlockOnAcknowledge(true);
+ sf.setAckBatchSize(0);
+ sf.setReconnectAttempts(1);
+ sf.setFailoverOnServerShutdown(true);
+
+ final ClientSession session = sf.createSession(false, true, false);
+
+ session.createQueue(ADDRESS, QUEUE, null, false);
+
+ final ClientConsumer consumer = session.createConsumer(QUEUE, null, false);
+ session.start();
+
+ Runnable r = new Runnable()
+ {
+
+ public void run()
+ {
+ try
+ {
+ Thread.sleep(2000);
+ //((ClientSessionInternal)session).getConnection().fail(new
HornetQException(HornetQException.NOT_CONNECTED));
+ //session.close();
+ consumer.close();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+ };
+
+ long start = System.currentTimeMillis();
+
+ new Thread(r).start();
+
+ ClientMessage message = consumer.receiveImmediate();
+ assertNull(message);
+
+ long end = System.currentTimeMillis();
+ assertTrue(end - start >= 2000);
+ session.close();
+
+ sf.close();
+ }
+}
Modified:
trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2009-11-13
10:35:30 UTC (rev 8279)
+++
trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2009-11-13
18:53:47 UTC (rev 8280)
@@ -978,6 +978,10 @@
{
// To change body of implemented methods use File | Settings | File Templates.
}
+
+ public void sync()
+ {
+ }
/* (non-Javadoc)
* @see
org.hornetq.core.persistence.StorageManager#loadMessageJournal(org.hornetq.core.paging.PagingManager,
java.util.Map, org.hornetq.core.transaction.ResourceManager, java.util.Map)