JBoss hornetq SVN: r11892 - trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl.
by do-not-reply@jboss.org
Author: ataylor
Date: 2011-12-09 05:41:07 -0500 (Fri, 09 Dec 2011)
New Revision: 11892
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
Log:
added some syncronization on closerunnable
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-12-09 01:37:20 UTC (rev 11891)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-12-09 10:41:07 UTC (rev 11892)
@@ -21,6 +21,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.Collections;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
@@ -154,7 +155,7 @@
private final Object waitLock = new Object();
- public final static List<CloseRunnable> CLOSE_RUNNABLES = new ArrayList<CloseRunnable>();
+ public final static List<CloseRunnable> CLOSE_RUNNABLES = Collections.synchronizedList(new ArrayList<CloseRunnable>());
// Static
// ---------------------------------------------------------------------------------------
@@ -1506,10 +1507,13 @@
}
+ private static int size = 0;
public class CloseRunnable implements Runnable
{
private final CoreRemotingConnection conn;
-
+ private int i = 0;
+ private boolean removed = false;
+ private boolean actuallRemoved = false;
private CloseRunnable(CoreRemotingConnection conn)
{
this.conn = conn;
@@ -1533,8 +1537,8 @@
public ClientSessionFactoryImpl stop()
{
+ causeExit();
CLOSE_RUNNABLES.remove(this);
- causeExit();
return ClientSessionFactoryImpl.this;
}
13 years
JBoss hornetq SVN: r11891 - tags.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-12-08 20:37:20 -0500 (Thu, 08 Dec 2011)
New Revision: 11891
Added:
tags/HornetQ_2_2_8_EAP_GA/
Log:
retagging 2.2.8
13 years
JBoss hornetq SVN: r11890 - tags.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-12-08 20:36:30 -0500 (Thu, 08 Dec 2011)
New Revision: 11890
Added:
tags/HornetQ_2_2_8_EAP_CR3/
Removed:
tags/HornetQ_2_2_8_EAP_CR1/
Log:
retagging 2.2.8
13 years
JBoss hornetq SVN: r11888 - branches/Branch_2_2_EAP/src/main/org/hornetq/ra.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-12-08 20:30:38 -0500 (Thu, 08 Dec 2011)
New Revision: 11888
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/ra/HornetQRAConnectionMetaData.java
Log:
Version changes
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/ra/HornetQRAConnectionMetaData.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/ra/HornetQRAConnectionMetaData.java 2011-12-09 01:19:10 UTC (rev 11887)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/ra/HornetQRAConnectionMetaData.java 2011-12-09 01:30:38 UTC (rev 11888)
@@ -113,7 +113,7 @@
HornetQRAConnectionMetaData.log.trace("getJMSProviderName()");
}
- return "2.1";
+ return "2.2";
}
/**
@@ -141,7 +141,7 @@
HornetQRAConnectionMetaData.log.trace("getProviderMinorVersion()");
}
- return 1;
+ return 2;
}
/**
13 years
JBoss hornetq SVN: r11887 - branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-12-08 20:19:10 -0500 (Thu, 08 Dec 2011)
New Revision: 11887
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
Log:
I bumped into some error message while testing / validating JBPAPP-6522 - so I'm commiting it as part of this JIRA. This is a minor fix
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-12-08 22:29:24 UTC (rev 11886)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-12-09 01:19:10 UTC (rev 11887)
@@ -27,6 +27,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import org.hornetq.api.core.DiscoveryGroupConfiguration;
import org.hornetq.api.core.Pair;
@@ -497,6 +498,14 @@
catch (Exception e)
{
log.warn("Unable to announce backup, retrying", e);
+
+ scheduledExecutor.schedule(new Runnable(){
+ public void run()
+ {
+ announceBackup();
+ }
+
+ }, retryInterval, TimeUnit.MILLISECONDS);
}
}
});
13 years
JBoss hornetq SVN: r11886 - branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-12-08 17:29:24 -0500 (Thu, 08 Dec 2011)
New Revision: 11886
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
Log:
Just improving some log.info
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-12-08 20:35:26 UTC (rev 11885)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-12-08 22:29:24 UTC (rev 11886)
@@ -1624,7 +1624,7 @@
for (Pair<Long, Long> msgToDelete : pendingLargeMessages)
{
- log.info("Deleting pending large message as it wasn't completed:" + msgToDelete);
+ log.info("Deleting pending large message as it wasn't completed: LargeMessageID:" + msgToDelete.getB());
LargeServerMessage msg = storageManager.createLargeMessage();
msg.setMessageID(msgToDelete.getB());
msg.setPendingRecordID(msgToDelete.getA());
13 years
JBoss hornetq SVN: r11885 - branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/spring.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-12-08 15:35:26 -0500 (Thu, 08 Dec 2011)
New Revision: 11885
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/spring/ExampleListener.java
Log:
fixing test
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/spring/ExampleListener.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/spring/ExampleListener.java 2011-12-08 20:24:48 UTC (rev 11884)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/spring/ExampleListener.java 2011-12-08 20:35:26 UTC (rev 11885)
@@ -5,6 +5,8 @@
import javax.jms.MessageListener;
import javax.jms.TextMessage;
+import org.hornetq.utils.ReusableLatch;
+
/**
* @author <a href="mailto:bill@burkecentral.com">Bill Burke</a>
* @version $Revision: 1 $
@@ -12,6 +14,8 @@
public class ExampleListener implements MessageListener
{
public static String lastMessage = null;
+
+ public static ReusableLatch latch = new ReusableLatch();
public void onMessage(Message message)
{
@@ -24,5 +28,6 @@
throw new RuntimeException(e);
}
System.out.println("MESSAGE RECEIVED: " + lastMessage);
+ latch.countDown();
}
}
13 years
JBoss hornetq SVN: r11884 - branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/spring.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-12-08 15:24:48 -0500 (Thu, 08 Dec 2011)
New Revision: 11884
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/spring/SpringIntegrationTest.java
Log:
fixing test
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/spring/SpringIntegrationTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/spring/SpringIntegrationTest.java 2011-12-08 18:22:55 UTC (rev 11883)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/spring/SpringIntegrationTest.java 2011-12-08 20:24:48 UTC (rev 11884)
@@ -1,5 +1,7 @@
package org.hornetq.tests.integration.spring;
+import java.util.concurrent.TimeUnit;
+
import junit.framework.Assert;
import org.hornetq.core.logging.Logger;
@@ -34,9 +36,12 @@
{
MessageSender sender = (MessageSender)context.getBean("MessageSender");
System.out.println("Sending message...");
+ ExampleListener.latch.countUp();
sender.send("Hello world");
- Thread.sleep(100);
+ ExampleListener.latch.await(10, TimeUnit.SECONDS);
+ Thread.sleep(500);
Assert.assertEquals(ExampleListener.lastMessage, "Hello world");
+ System.out.println("done!");
((HornetQConnectionFactory)sender.getConnectionFactory()).close();
}
finally
13 years
JBoss hornetq SVN: r11883 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/server/cluster/impl and 1 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-12-08 13:22:55 -0500 (Thu, 08 Dec 2011)
New Revision: 11883
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/config/BridgeConfiguration.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
Log:
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/config/BridgeConfiguration.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/config/BridgeConfiguration.java 2011-12-08 18:16:25 UTC (rev 11882)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/config/BridgeConfiguration.java 2011-12-08 18:22:55 UTC (rev 11883)
@@ -68,6 +68,10 @@
private final long maxRetryInterval;
private final int minLargeMessageSize;
+
+ // At this point this is only changed on testcases
+ // The bridge shouldn't be sending blocking anyways
+ private long callTimeout = HornetQClient.DEFAULT_CALL_TIMEOUT;
/**
* For backward compatibility on the API... no MinLargeMessage on this constructor
@@ -445,4 +449,26 @@
{
this.password = password;
}
+
+
+ /**
+ * @return the callTimeout
+ */
+ public long getCallTimeout()
+ {
+ return callTimeout;
+ }
+
+ /**
+ *
+ * At this point this is only changed on testcases
+ * The bridge shouldn't be sending blocking anyways
+ * @param callTimeout the callTimeout to set
+ */
+ public void setCallTimeout(long callTimeout)
+ {
+ this.callTimeout = callTimeout;
+ }
+
+
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-12-08 18:16:25 UTC (rev 11882)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-12-08 18:22:55 UTC (rev 11883)
@@ -572,11 +572,19 @@
{
producer.send(dest, message);
}
- catch (HornetQException e)
+ catch (final HornetQException e)
{
log.warn("Unable to send message " + ref + ", will try again once bridge reconnects", e);
refs.remove(ref);
+
+ executor.execute(new Runnable()
+ {
+ public void run()
+ {
+ connectionFailed(e, false);
+ }
+ });
return HandleStatus.BUSY;
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-12-08 18:16:25 UTC (rev 11882)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-12-08 18:22:55 UTC (rev 11883)
@@ -471,6 +471,12 @@
serverLocator.setBlockOnDurableSend(!config.isUseDuplicateDetection());
serverLocator.setBlockOnNonDurableSend(!config.isUseDuplicateDetection());
serverLocator.setMinLargeMessageSize(config.getMinLargeMessageSize());
+
+ // This will be set to 30s unless it's changed from embedded / testing
+ // there is no reason to exception the config for this timeout
+ // since the Bridge is supposed to be non-blocking and fast
+ // We may expose this if we find a good use case
+ serverLocator.setCallTimeout(config.getCallTimeout());
if (!config.isUseDuplicateDetection())
{
log.debug("Bridge " + config.getName() +
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java 2011-12-08 18:16:25 UTC (rev 11882)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java 2011-12-08 18:22:55 UTC (rev 11883)
@@ -17,6 +17,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -24,6 +25,8 @@
import junit.framework.Assert;
import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.Interceptor;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientConsumer;
@@ -39,6 +42,9 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.core.postoffice.DuplicateIDCache;
import org.hornetq.core.postoffice.impl.PostOfficeImpl;
+import org.hornetq.core.protocol.core.Packet;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionSendMessage;
import org.hornetq.core.remoting.impl.invm.TransportConstants;
import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
import org.hornetq.core.server.HornetQServer;
@@ -46,6 +52,7 @@
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.cluster.impl.BridgeImpl;
import org.hornetq.core.transaction.impl.TransactionImpl;
+import org.hornetq.spi.core.protocol.RemotingConnection;
import org.hornetq.tests.util.RandomUtil;
import org.hornetq.tests.util.ServiceTestBase;
import org.hornetq.tests.util.UnitTestCase;
@@ -263,6 +270,220 @@
}
+
+ public void testLostMessageSimpleMessage() throws Exception
+ {
+ internalTestMessageLoss(false);
+ }
+
+ public void testLostMessageLargeMessage() throws Exception
+ {
+ internalTestMessageLoss(true);
+ }
+
+ /** This test will ignore messages
+ What will cause the bridge to fail with a timeout
+ The bridge should still recover the failure and reconnect on that case */
+ public void internalTestMessageLoss(final boolean largeMessage) throws Exception
+ {
+ class MyInterceptor implements Interceptor
+ {
+ public boolean ignoreSends = true;
+ public CountDownLatch latch;
+
+ MyInterceptor(int numberOfIgnores)
+ {
+ latch = new CountDownLatch(numberOfIgnores);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.Interceptor#intercept(org.hornetq.core.protocol.core.Packet, org.hornetq.spi.core.protocol.RemotingConnection)
+ */
+ public boolean intercept(Packet packet, RemotingConnection connection) throws HornetQException
+ {
+ if (ignoreSends && packet instanceof SessionSendMessage ||
+ ignoreSends && packet instanceof SessionSendContinuationMessage && !((SessionSendContinuationMessage)packet).isContinues())
+ {
+ System.out.println("Ignored");
+ latch.countDown();
+ return false;
+ }
+ else
+ {
+ return true;
+ }
+ }
+
+ }
+
+ MyInterceptor myInterceptor = new MyInterceptor(3);
+
+ HornetQServer server0 = null;
+ HornetQServer server1 = null;
+ ServerLocator locator = null;
+ try
+ {
+ Map<String, Object> server0Params = new HashMap<String, Object>();
+ server0 = createClusteredServerWithParams(isNetty(), 0, true, server0Params);
+
+ Map<String, Object> server1Params = new HashMap<String, Object>();
+ addTargetParameters(server1Params);
+ server1 = createClusteredServerWithParams(isNetty(), 1, true, server1Params);
+
+ final String testAddress = "testAddress";
+ final String queueName0 = "queue0";
+ final String forwardAddress = "forwardAddress";
+ final String queueName1 = "queue1";
+
+ TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params);
+
+ TransportConfiguration server1tc = new TransportConfiguration(getConnector(), server1Params);
+
+ HashMap<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
+ connectors.put(server1tc.getName(), server1tc);
+ server0.getConfiguration().setConnectorConfigurations(connectors);
+
+ final int messageSize = 1024;
+
+ final int numMessages = 1;
+
+ ArrayList<String> connectorConfig = new ArrayList<String>();
+ connectorConfig.add(server1tc.getName());
+ BridgeConfiguration bridgeConfiguration = new BridgeConfiguration("bridge1",
+ queueName0,
+ forwardAddress,
+ null,
+ null,
+ HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+ HornetQClient.DEFAULT_CONNECTION_TTL,
+ 1000,
+ HornetQClient.DEFAULT_MAX_RETRY_INTERVAL,
+ 1d,
+ -1,
+ false,
+ // Choose confirmation size to make sure acks
+ // are sent
+ numMessages * messageSize / 2,
+ connectorConfig,
+ false,
+ ConfigurationImpl.DEFAULT_CLUSTER_USER,
+ ConfigurationImpl.DEFAULT_CLUSTER_PASSWORD);
+
+ bridgeConfiguration.setCallTimeout(500);
+
+ List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
+ bridgeConfigs.add(bridgeConfiguration);
+ server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);
+
+ CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration(testAddress, queueName0, null, true);
+ List<CoreQueueConfiguration> queueConfigs0 = new ArrayList<CoreQueueConfiguration>();
+ queueConfigs0.add(queueConfig0);
+ server0.getConfiguration().setQueueConfigurations(queueConfigs0);
+
+ CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration(forwardAddress, queueName1, null, true);
+ List<CoreQueueConfiguration> queueConfigs1 = new ArrayList<CoreQueueConfiguration>();
+ queueConfigs1.add(queueConfig1);
+ server1.getConfiguration().setQueueConfigurations(queueConfigs1);
+
+ server1.start();
+
+ server1.getRemotingService().addInterceptor(myInterceptor);
+
+ server0.start();
+ locator = HornetQClient.createServerLocatorWithoutHA(server0tc, server1tc);
+ ClientSessionFactory sf0 = locator.createSessionFactory(server0tc);
+
+ ClientSessionFactory sf1 = locator.createSessionFactory(server1tc);
+
+ ClientSession session0 = sf0.createSession(false, true, true);
+
+ ClientSession session1 = sf1.createSession(false, true, true);
+
+ ClientProducer producer0 = session0.createProducer(new SimpleString(testAddress));
+
+ ClientConsumer consumer1 = session1.createConsumer(queueName1);
+
+ session1.start();
+
+ final byte[] bytes = new byte[messageSize];
+
+ final SimpleString propKey = new SimpleString("testkey");
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session0.createMessage(true);
+
+ if (largeMessage)
+ {
+ message.setBodyInputStream(UnitTestCase.createFakeLargeStream(1024 * 1024));
+ }
+
+ message.putIntProperty(propKey, i);
+
+ message.getBodyBuffer().writeBytes(bytes);
+
+ producer0.send(message);
+ }
+
+ myInterceptor.latch.await();
+ myInterceptor.ignoreSends = false;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = consumer1.receive(30000);
+
+ Assert.assertNotNull(message);
+
+ Assert.assertEquals(i, message.getObjectProperty(propKey));
+
+ if (largeMessage)
+ {
+ readMessages(message);
+ }
+
+ message.acknowledge();
+ }
+
+ Assert.assertNull(consumer1.receiveImmediate());
+
+ session0.close();
+
+ session1.close();
+
+ sf0.close();
+
+ sf1.close();
+
+ }
+ finally
+ {
+ if (locator != null)
+ {
+ locator.close();
+ }
+ try
+ {
+ server0.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+
+ try
+ {
+ server1.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+
+ assertEquals(0, loadQueues(server0).size());
+
+ }
+
/**
* @param server1Params
*/
13 years