Author: clebert.suconic(a)jboss.com
Date: 2011-04-05 17:50:24 -0400 (Tue, 05 Apr 2011)
New Revision: 10456
Added:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/AlmostLargeAsynchronousFailoverTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverOnFlowControlTest.java
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/AsynchronousFailoverTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/reattach/ReattachTest.java
Log:
https://issues.jboss.org/browse/JBPAPP-6252 - fixing failover during flow control
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java 2011-04-05
16:17:19 UTC (rev 10455)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java 2011-04-05
21:50:24 UTC (rev 10456)
@@ -83,9 +83,13 @@
semaphore.drainPermits();
+ int beforeFailure = arriving;
+
arriving = 0;
- checkCredits(windowSize * 2);
+ // If we are waiting for more credits than what's configured, then we need to
use what we tried before
+ // otherwise the client may starve as the credit will never arrive
+ checkCredits(Math.max(windowSize * 2, beforeFailure));
}
public void close()
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java 2011-04-05
16:17:19 UTC (rev 10455)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java 2011-04-05
21:50:24 UTC (rev 10456)
@@ -506,7 +506,7 @@
}
}
- public void afterRollback()
+ public synchronized void afterRollback()
{
if (operations != null)
{
@@ -517,7 +517,7 @@
}
}
- public void beforeCommit() throws Exception
+ public synchronized void beforeCommit() throws Exception
{
if (operations != null)
{
@@ -528,7 +528,7 @@
}
}
- public void beforePrepare() throws Exception
+ public synchronized void beforePrepare() throws Exception
{
if (operations != null)
{
@@ -539,7 +539,7 @@
}
}
- public void beforeRollback() throws Exception
+ public synchronized void beforeRollback() throws Exception
{
if (operations != null)
{
@@ -550,7 +550,7 @@
}
}
- public void afterPrepare()
+ public synchronized void afterPrepare()
{
if (operations != null)
{
Added:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/AlmostLargeAsynchronousFailoverTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/AlmostLargeAsynchronousFailoverTest.java
(rev 0)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/AlmostLargeAsynchronousFailoverTest.java 2011-04-05
21:50:24 UTC (rev 10456)
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.cluster.failover;
+
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.core.client.impl.ServerLocatorInternal;
+
+/**
+ * Validating failover when the size of the message Size > flow Control &&
message Size < minLargeMessageSize
+ *
+ * @author clebertsuconic
+ *
+ *
+ */
+public class AlmostLargeAsynchronousFailoverTest extends AsynchronousFailoverTest
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ protected void createConfigs() throws Exception
+ {
+ super.createConfigs();
+ liveServer.getServer().getConfiguration().setJournalFileSize(1024 * 1024);
+ backupServer.getServer().getConfiguration().setJournalFileSize(1024 * 1024);
+ }
+
+ protected ServerLocatorInternal getServerLocator() throws Exception
+ {
+ ServerLocatorInternal locator = super.getServerLocator();
+ locator.setMinLargeMessageSize(1024 * 1024);
+ locator.setProducerWindowSize(10 * 1024);
+ return locator;
+ }
+
+ protected void addPayload(ClientMessage message)
+ {
+ message.putBytesProperty("payload", new byte[20 * 1024]);
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/AsynchronousFailoverTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/AsynchronousFailoverTest.java 2011-04-05
16:17:19 UTC (rev 10455)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/AsynchronousFailoverTest.java 2011-04-05
21:50:24 UTC (rev 10456)
@@ -243,6 +243,10 @@
DelegatingSession.debug = false;
}
}
+
+ protected void addPayload(ClientMessage msg)
+ {
+ }
private void doTestNonTransactional(final TestRunner runner) throws Exception
{
@@ -274,6 +278,8 @@
message.getBodyBuffer().writeString("message" + i);
message.putIntProperty("counter", i);
+
+ addPayload(message);
producer.send(message);
@@ -402,7 +408,10 @@
message.putStringProperty(Message.HDR_DUPLICATE_DETECTION_ID, new
SimpleString("id:" + i +
",exec:" +
executionId));
+
+ addPayload(message);
+
producer.send(message);
}
Added:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverOnFlowControlTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverOnFlowControlTest.java
(rev 0)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverOnFlowControlTest.java 2011-04-05
21:50:24 UTC (rev 10456)
@@ -0,0 +1,157 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.cluster.failover;
+
+import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.Interceptor;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
+import org.hornetq.core.client.impl.ServerLocatorInternal;
+import org.hornetq.core.protocol.core.Packet;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionProducerCreditsMessage;
+import org.hornetq.spi.core.protocol.RemotingConnection;
+
+/**
+ * A FailoverOnFlowControlTest
+ *
+ * @author clebertsuconic
+ *
+ *
+ */
+public class FailoverOnFlowControlTest extends FailoverTestBase
+{
+
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+
+ public void testOverflowSend() throws Exception
+ {
+ ServerLocator locator = getServerLocator();
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setReconnectAttempts(-1);
+ locator.setProducerWindowSize(1000);
+ final ArrayList<ClientSession> sessionList = new
ArrayList<ClientSession>();
+ Interceptor interceptorClient = new Interceptor()
+ {
+ AtomicInteger count = new AtomicInteger(0);
+ public boolean intercept(Packet packet, RemotingConnection connection) throws
HornetQException
+ {
+ System.out.println("Intercept..." + packet.getClass().getName());
+
+ if (packet instanceof SessionProducerCreditsMessage )
+ {
+ SessionProducerCreditsMessage credit =
(SessionProducerCreditsMessage)packet;
+
+ System.out.println("Credits: " + credit.getCredits());
+ if (count.incrementAndGet() == 2)
+ {
+ try
+ {
+ crash(sessionList.get(0));
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ return false;
+ }
+ }
+ return true;
+ }
+ };
+
+ locator.addInterceptor(interceptorClient);
+
+ ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator,
2);
+ ClientSession session = sf.createSession(true, true);
+ sessionList.add(session);
+
+ session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null,
true);
+
+ ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
+
+
+ final int numMessages = 10;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session.createMessage(true);
+
+ message.getBodyBuffer().writeBytes(new byte[5000]);
+
+ message.putIntProperty("counter", i);
+
+ producer.send(message);
+ }
+
+ session.close();
+
+ locator.close();
+ }
+
+
+ protected void createConfigs() throws Exception
+ {
+ super.createConfigs();
+ liveServer.getServer().getConfiguration().setJournalFileSize(1024 * 1024);
+ backupServer.getServer().getConfiguration().setJournalFileSize(1024 * 1024);
+ }
+
+ protected ServerLocatorInternal getServerLocator() throws Exception
+ {
+ ServerLocatorInternal locator = super.getServerLocator();
+ locator.setMinLargeMessageSize(1024 * 1024);
+ locator.setProducerWindowSize(10 * 1024);
+ return locator;
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ @Override
+ protected TransportConfiguration getAcceptorTransportConfiguration(final boolean
live)
+ {
+ return getInVMTransportAcceptorConfiguration(live);
+ }
+
+ @Override
+ protected TransportConfiguration getConnectorTransportConfiguration(final boolean
live)
+ {
+ return getInVMConnectorTransportConfiguration(live);
+ }
+
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/reattach/ReattachTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/reattach/ReattachTest.java 2011-04-05
16:17:19 UTC (rev 10455)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/reattach/ReattachTest.java 2011-04-05
21:50:24 UTC (rev 10456)
@@ -16,15 +16,20 @@
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
import junit.framework.Assert;
import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.Interceptor;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.client.*;
import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
import org.hornetq.core.client.impl.ClientSessionInternal;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.protocol.core.Packet;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionProducerCreditsMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionSendMessage;
import org.hornetq.core.remoting.impl.invm.InVMConnector;
import org.hornetq.core.remoting.impl.invm.InVMRegistry;
import org.hornetq.core.server.HornetQServer;
@@ -77,7 +82,7 @@
locator.setRetryIntervalMultiplier(retryMultiplier);
locator.setReconnectAttempts(reconnectAttempts);
locator.setConfirmationWindowSize(1024 * 1024);
- ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal)
locator.createSessionFactory();
+ ClientSessionFactoryInternal sf =
(ClientSessionFactoryInternal)locator.createSessionFactory();
ClientSession session = sf.createSession(false, true, true);
@@ -94,10 +99,10 @@
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = session.createMessage(HornetQTextMessage.TYPE,
- false,
- 0,
-
System.currentTimeMillis(),
- (byte)1);
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
message.putIntProperty(new SimpleString("count"), i);
message.getBodyBuffer().writeString("aardvarks");
producer.send(message);
@@ -139,6 +144,77 @@
}
/*
+ * Test failure on connection, but server is still up so should immediately reconnect
+ */
+ public void testOverflowCredits() throws Exception
+ {
+ final long retryInterval = 500;
+
+ final double retryMultiplier = 1d;
+
+ final int reconnectAttempts = 1;
+
+ locator.setRetryInterval(retryInterval);
+ locator.setRetryIntervalMultiplier(retryMultiplier);
+ locator.setReconnectAttempts(reconnectAttempts);
+ locator.setConfirmationWindowSize(1024 * 1024);
+ locator.setProducerWindowSize(1000);
+
+ final AtomicInteger count = new AtomicInteger(0);
+
+ Interceptor intercept = new Interceptor()
+ {
+
+ public boolean intercept(Packet packet, RemotingConnection connection) throws
HornetQException
+ {
+ System.out.println("Intercept..." + packet.getClass().getName());
+
+ if (packet instanceof SessionProducerCreditsMessage )
+ {
+ SessionProducerCreditsMessage credit =
(SessionProducerCreditsMessage)packet;
+
+ System.out.println("Credits: " + credit.getCredits());
+ if (count.incrementAndGet() == 2)
+ {
+ System.out.println("Failing");
+ connection.fail(new HornetQException(1, "bye"));
+ return false;
+ }
+ }
+ return true;
+ }
+ };
+
+ locator.addInterceptor(intercept);
+
+ ClientSessionFactoryInternal sf =
(ClientSessionFactoryInternal)locator.createSessionFactory();
+
+ ClientSession session = sf.createSession(false, true, true);
+
+ session.createQueue(ReattachTest.ADDRESS, ReattachTest.ADDRESS, null, false);
+
+ ClientProducer producer = session.createProducer(ReattachTest.ADDRESS);
+
+ final int numMessages = 10;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session.createMessage(HornetQTextMessage.TYPE,
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ message.getBodyBuffer().writeBytes(new byte[5000]);
+ producer.send(message);
+ }
+
+ session.close();
+
+ sf.close();
+ }
+
+ /*
* Test failure on connection, simulate failure to create connection for a while, then
* allow connection to be recreated
*/
@@ -154,7 +230,7 @@
locator.setRetryIntervalMultiplier(retryMultiplier);
locator.setReconnectAttempts(reconnectAttempts);
locator.setConfirmationWindowSize(1024 * 1024);
- ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal)
locator.createSessionFactory();
+ ClientSessionFactoryInternal sf =
(ClientSessionFactoryInternal)locator.createSessionFactory();
ClientSession session = sf.createSession(false, true, true);
@@ -167,10 +243,10 @@
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = session.createMessage(HornetQTextMessage.TYPE,
- false,
- 0,
- System.currentTimeMillis(),
- (byte)1);
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
message.putIntProperty(new SimpleString("count"), i);
message.getBodyBuffer().writeString("aardvarks");
producer.send(message);
@@ -244,7 +320,7 @@
locator.setRetryIntervalMultiplier(retryMultiplier);
locator.setReconnectAttempts(reconnectAttempts);
locator.setConfirmationWindowSize(1024 * 1024);
- ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal)
locator.createSessionFactory();
+ ClientSessionFactoryInternal sf =
(ClientSessionFactoryInternal)locator.createSessionFactory();
ClientSession session = sf.createSession(false, true, true);
@@ -277,10 +353,10 @@
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = session.createMessage(HornetQTextMessage.TYPE,
- false,
- 0,
- System.currentTimeMillis(),
- (byte)1);
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
message.putIntProperty(new SimpleString("count"), i);
message.getBodyBuffer().writeString("aardvarks");
producer.send(message);
@@ -358,7 +434,7 @@
locator.setRetryIntervalMultiplier(retryMultiplier);
locator.setReconnectAttempts(reconnectAttempts);
locator.setConfirmationWindowSize(1024 * 1024);
- ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal)
locator.createSessionFactory();
+ ClientSessionFactoryInternal sf =
(ClientSessionFactoryInternal)locator.createSessionFactory();
ClientSession session = sf.createSession(false, true, true);
@@ -371,10 +447,10 @@
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = session.createMessage(HornetQTextMessage.TYPE,
- false,
- 0,
- System.currentTimeMillis(),
- (byte)1);
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
message.putIntProperty(new SimpleString("count"), i);
message.getBodyBuffer().writeString("aardvarks");
producer.send(message);
@@ -448,7 +524,7 @@
locator.setRetryIntervalMultiplier(retryMultiplier);
locator.setReconnectAttempts(reconnectAttempts);
locator.setConfirmationWindowSize(1024 * 1024);
- final ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal)
locator.createSessionFactory();
+ final ClientSessionFactoryInternal sf =
(ClientSessionFactoryInternal)locator.createSessionFactory();
session = sf.createSession();
@@ -558,7 +634,7 @@
locator.setRetryIntervalMultiplier(retryMultiplier);
locator.setReconnectAttempts(reconnectAttempts);
locator.setConfirmationWindowSize(1024 * 1024);
- final ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal)
locator.createSessionFactory();
+ final ClientSessionFactoryInternal sf =
(ClientSessionFactoryInternal)locator.createSessionFactory();
InVMConnector.failOnCreateConnection = true;
@@ -656,7 +732,7 @@
locator.setRetryIntervalMultiplier(retryMultiplier);
locator.setReconnectAttempts(reconnectAttempts);
locator.setConfirmationWindowSize(1024 * 1024);
- ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal)
locator.createSessionFactory();
+ ClientSessionFactoryInternal sf =
(ClientSessionFactoryInternal)locator.createSessionFactory();
ClientSession session = sf.createSession(false, true, true);
@@ -697,11 +773,11 @@
//
// //Should throw exception since didn't reconnect
- //
+ //
// try
// {
// session.start();
- //
+ //
// fail("Should throw exception");
// }
// catch (HornetQException e)
@@ -728,7 +804,7 @@
locator.setRetryIntervalMultiplier(retryMultiplier);
locator.setReconnectAttempts(reconnectAttempts);
locator.setConfirmationWindowSize(1024 * 1024);
- ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal)
locator.createSessionFactory();
+ ClientSessionFactoryInternal sf =
(ClientSessionFactoryInternal)locator.createSessionFactory();
ClientSession session = sf.createSession(false, true, true);
@@ -741,10 +817,10 @@
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = session.createMessage(HornetQTextMessage.TYPE,
- false,
- 0,
- System.currentTimeMillis(),
- (byte)1);
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
message.putIntProperty(new SimpleString("count"), i);
message.getBodyBuffer().writeString("aardvarks");
producer.send(message);
@@ -791,12 +867,11 @@
final int reconnectAttempts = -1;
-
locator.setRetryInterval(retryInterval);
locator.setRetryIntervalMultiplier(retryMultiplier);
locator.setReconnectAttempts(reconnectAttempts);
locator.setConfirmationWindowSize(1024 * 1024);
- ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal)
locator.createSessionFactory();
+ ClientSessionFactoryInternal sf =
(ClientSessionFactoryInternal)locator.createSessionFactory();
ClientSession session = sf.createSession(false, true, true);
@@ -809,10 +884,10 @@
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = session.createMessage(HornetQTextMessage.TYPE,
- false,
- 0,
- System.currentTimeMillis(),
- (byte)1);
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
message.putIntProperty(new SimpleString("count"), i);
message.getBodyBuffer().writeString("aardvarks");
producer.send(message);
@@ -884,12 +959,11 @@
final int reconnectAttempts = -1;
-
locator.setRetryInterval(retryInterval);
locator.setRetryIntervalMultiplier(retryMultiplier);
locator.setReconnectAttempts(reconnectAttempts);
locator.setConfirmationWindowSize(1024 * 1024);
- ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal)
locator.createSessionFactory();
+ ClientSessionFactoryInternal sf =
(ClientSessionFactoryInternal)locator.createSessionFactory();
ClientSession session = sf.createSession(false, true, true);
@@ -902,10 +976,10 @@
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = session.createMessage(HornetQTextMessage.TYPE,
- false,
- 0,
- System.currentTimeMillis(),
- (byte)1);
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
message.putIntProperty(new SimpleString("count"), i);
message.getBodyBuffer().writeString("aardvarks");
producer.send(message);
@@ -967,7 +1041,7 @@
locator.setReconnectAttempts(reconnectAttempts);
locator.setMaxRetryInterval(maxRetryInterval);
locator.setConfirmationWindowSize(1024 * 1024);
- ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal)
locator.createSessionFactory();
+ ClientSessionFactoryInternal sf =
(ClientSessionFactoryInternal)locator.createSessionFactory();
ClientSession session = sf.createSession(false, true, true);
@@ -980,10 +1054,10 @@
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = session.createMessage(HornetQTextMessage.TYPE,
- false,
- 0,
- System.currentTimeMillis(),
- (byte)1);
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
message.putIntProperty(new SimpleString("count"), i);
message.getBodyBuffer().writeString("aardvarks");
producer.send(message);
@@ -1045,7 +1119,7 @@
service.start();
- locator = createFactory(false);
+ locator = createFactory(false);
}
@Override
@@ -1054,7 +1128,7 @@
InVMConnector.resetFailures();
locator.close();
-
+
service.stop();
Assert.assertEquals(0, InVMRegistry.instance.size());