JBoss hornetq SVN: r11594 - trunk/hornetq-core/src/main/java/org/hornetq/core/remoting/impl/netty.
by do-not-reply@jboss.org
Author: borges
Date: 2011-10-25 10:51:22 -0400 (Tue, 25 Oct 2011)
New Revision: 11594
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/remoting/impl/netty/NettyConnection.java
Log:
Merge JBPAPP-7353 fixes to trunk.
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/remoting/impl/netty/NettyConnection.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/remoting/impl/netty/NettyConnection.java 2011-…
[View More]10-25 14:42:44 UTC (rev 11593)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/remoting/impl/netty/NettyConnection.java 2011-10-25 14:51:22 UTC (rev 11594)
@@ -14,7 +14,7 @@
package org.hornetq.core.remoting.impl.netty;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.Semaphore;
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQBuffers;
@@ -35,7 +35,7 @@
* @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
* @author <a href="mailto:ataylor@redhat.com">Andy Taylor</a>
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- *
+ *
* @version <tt>$Revision$</tt>
*/
public class NettyConnection implements Connection
@@ -60,10 +60,10 @@
private volatile HornetQBuffer batchBuffer;
- private final AtomicBoolean writeLock = new AtomicBoolean(false);
-
- private Set<ReadyListener> readyListeners = new ConcurrentHashSet<ReadyListener>();
+ private final Semaphore writeLock = new Semaphore(1);
+ private final Set<ReadyListener> readyListeners = new ConcurrentHashSet<ReadyListener>();
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -75,7 +75,7 @@
{
this(null, channel, listener, batchingEnabled, directDeliver);
}
-
+
public NettyConnection(final Acceptor acceptor,
final Channel channel,
final ConnectionLifeCycleListener listener,
@@ -152,7 +152,7 @@
return;
}
- if (writeLock.compareAndSet(false, true))
+ if (writeLock.tryAcquire())
{
try
{
@@ -165,7 +165,7 @@
}
finally
{
- writeLock.set(false);
+ writeLock.release();
}
}
}
@@ -177,11 +177,9 @@
public void write(HornetQBuffer buffer, final boolean flush, final boolean batched)
{
- while (!writeLock.compareAndSet(false, true))
+ try
{
- Thread.yield();
- }
-
+ writeLock.acquire();
try
{
if (batchBuffer == null && batchingEnabled && batched && !flush)
@@ -243,8 +241,13 @@
}
finally
{
- writeLock.set(false);
+ writeLock.release();
+ }
}
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ }
}
public String getRemoteAddress()
@@ -256,17 +259,17 @@
{
return directDeliver;
}
-
+
public void addReadyListener(final ReadyListener listener)
{
readyListeners.add(listener);
}
-
+
public void removeReadyListener(final ReadyListener listener)
{
readyListeners.remove(listener);
}
-
+
public void fireReady(final boolean ready)
{
for (ReadyListener listener: readyListeners)
[View Less]
13 years, 2 months
JBoss hornetq SVN: r11593 - branches/Branch_2_2_EAP/src/main/org/hornetq/ra.
by do-not-reply@jboss.org
Author: ataylor
Date: 2011-10-25 10:42:44 -0400 (Tue, 25 Oct 2011)
New Revision: 11593
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/ra/HornetQRAManagedConnectionFactory.java
branches/Branch_2_2_EAP/src/main/org/hornetq/ra/HornetQResourceAdapter.java
Log:
regeister recovery at startup
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/ra/HornetQRAManagedConnectionFactory.java
===================================================================
--- branches/Branch_2_2_EAP/src/…
[View More]main/org/hornetq/ra/HornetQRAManagedConnectionFactory.java 2011-10-25 13:09:53 UTC (rev 11592)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/ra/HornetQRAManagedConnectionFactory.java 2011-10-25 14:42:44 UTC (rev 11593)
@@ -138,6 +138,11 @@
cm);
}
+ if (connectionFactory == null)
+ {
+ connectionFactory = ra.createHornetQConnectionFactory(mcfProperties);
+ resourceRecovery = ra.getRecoveryManager().register(connectionFactory, null, null);
+ }
return cf;
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/ra/HornetQResourceAdapter.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/ra/HornetQResourceAdapter.java 2011-10-25 13:09:53 UTC (rev 11592)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/ra/HornetQResourceAdapter.java 2011-10-25 14:42:44 UTC (rev 11593)
@@ -218,6 +218,18 @@
this.ctx = ctx;
+ if (!configured.getAndSet(true))
+ {
+ try
+ {
+ setup();
+ }
+ catch (HornetQException e)
+ {
+ throw new ResourceAdapterInternalException("Unable to create activation", e);
+ }
+ }
+
HornetQResourceAdapter.log.info("HornetQ resource adaptor started");
}
[View Less]
13 years, 2 months
JBoss hornetq SVN: r11592 - trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl.
by do-not-reply@jboss.org
Author: borges
Date: 2011-10-25 09:09:53 -0400 (Tue, 25 Oct 2011)
New Revision: 11592
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java
Log:
Fix dead-lock between connecting factories and serverLocator.
ServerLocator will be locked by ClientSessionFactoryInternal's which are trying
to connect. So we must call causeExit() on these before trying to acquire the
lock on ServerLocator.close()
Modified: trunk/hornetq-core/src/main/java/org/hornetq/…
[View More]core/client/impl/ServerLocatorImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-10-25 12:32:41 UTC (rev 11591)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-10-25 13:09:53 UTC (rev 11592)
@@ -70,6 +70,7 @@
private transient String identity;
private final Set<ClientSessionFactoryInternal> factories = new HashSet<ClientSessionFactoryInternal>();
+ private final Set<ClientSessionFactoryInternal> connectingFactories = new HashSet<ClientSessionFactoryInternal>();
private TransportConfiguration[] initialConnectors;
@@ -624,11 +625,10 @@
threadPool,
scheduledThreadPool,
interceptors);
-
- factory.connect(reconnectAttempts, failoverOnInitialConnection);
-
- addFactory(factory);
-
+ connectingFactories.add(factory);
+ factory.connect(reconnectAttempts, failoverOnInitialConnection);
+ connectingFactories.remove(factory);
+ addFactory(factory);
return factory;
}
}
@@ -689,11 +689,17 @@
threadPool,
scheduledThreadPool,
interceptors);
+ connectingFactories.add(factory);
factory.connect(initialConnectAttempts, failoverOnInitialConnection);
+ connectingFactories.remove(factory);
}
catch (HornetQException e)
{
- factory.close();
+ if (factory != null)
+ {
+ connectingFactories.remove(factory);
+ factory.close();
+ }
factory = null;
if (e.getCode() == HornetQException.NOT_CONNECTED)
{
@@ -743,7 +749,6 @@
throw new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
"Timed out waiting to receive cluster topology. Group:" + discoveryGroup);
}
-
}
addFactory(factory);
@@ -1214,6 +1219,11 @@
staticConnector.disconnect();
}
+ for (ClientSessionFactoryInternal factory : connectingFactories)
+ {
+ factory.causeExit();
+ factory.close();
+ }
synchronized (this)
{
Set<ClientSessionFactoryInternal> clonedFactory = new HashSet<ClientSessionFactoryInternal>(factories);
@@ -1464,10 +1474,19 @@
public synchronized void addFactory(ClientSessionFactoryInternal factory)
{
- if (factory != null)
+ if (factory == null)
{
- TransportConfiguration backup = null;
+ return;
+ }
+ if (closed || closing)
+ {
+ factory.close();
+ return;
+ }
+
+ TransportConfiguration backup = null;
+
if (topology != null)
{
backup = topology.getBackupForConnector(factory.getConnectorConfiguration());
@@ -1475,7 +1494,7 @@
factory.setBackupConnector(factory.getConnectorConfiguration(), backup);
factories.add(factory);
- }
+
}
final class StaticConnector implements Serializable
[View Less]
13 years, 2 months
JBoss hornetq SVN: r11591 - trunk/hornetq-core/src/test/java/org/hornetq/tests/util.
by do-not-reply@jboss.org
Author: borges
Date: 2011-10-25 08:32:41 -0400 (Tue, 25 Oct 2011)
New Revision: 11591
Modified:
trunk/hornetq-core/src/test/java/org/hornetq/tests/util/UnitTestCase.java
Log:
Fix NPE affecting Hudson tests.
Modified: trunk/hornetq-core/src/test/java/org/hornetq/tests/util/UnitTestCase.java
===================================================================
--- trunk/hornetq-core/src/test/java/org/hornetq/tests/util/UnitTestCase.java 2011-10-25 12:32:28 UTC (rev 11590)
+++ trunk/hornetq-…
[View More]core/src/test/java/org/hornetq/tests/util/UnitTestCase.java 2011-10-25 12:32:41 UTC (rev 11591)
@@ -924,14 +924,24 @@
{
List<ClientSessionFactoryImpl.CloseRunnable> closeRunnables = new ArrayList<ClientSessionFactoryImpl.CloseRunnable>(ClientSessionFactoryImpl.CLOSE_RUNNABLES);
ArrayList<Exception> exceptions = new ArrayList<Exception>();
- if(!closeRunnables.isEmpty())
+ try
{
+ if (!closeRunnables.isEmpty())
+ {
for (ClientSessionFactoryImpl.CloseRunnable closeRunnable : closeRunnables)
{
- exceptions.add(closeRunnable.stop().e);
+ if (closeRunnable != null)
+ {
+ exceptions.add(closeRunnable.stop().e);
+ }
+ }
}
}
- cleanupPools();
+ finally
+ {
+ cleanupPools();
+ }
+
//clean up pools before failing
if(!exceptions.isEmpty())
{
[View Less]
13 years, 2 months
JBoss hornetq SVN: r11590 - trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl.
by do-not-reply@jboss.org
Author: borges
Date: 2011-10-25 08:32:28 -0400 (Tue, 25 Oct 2011)
New Revision: 11590
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java
Log:
Stop creation of connectors if locator is closed or closing.
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.…
[View More]java 2011-10-25 12:32:12 UTC (rev 11589)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-10-25 12:32:28 UTC (rev 11590)
@@ -573,19 +573,15 @@
public ClientSessionFactoryInternal connect() throws Exception
{
- ClientSessionFactoryInternal sf;
// static list of initial connectors
if (initialConnectors != null && discoveryGroup == null)
{
- sf = (ClientSessionFactoryInternal)staticConnector.connect();
+ ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal)staticConnector.connect();
+ addFactory(sf);
+ return sf;
}
// wait for discovery group to get the list of initial connectors
- else
- {
- sf = (ClientSessionFactoryInternal)createSessionFactory();
- }
- addFactory(sf);
- return sf;
+ return (ClientSessionFactoryInternal)createSessionFactory();
}
/* (non-Javadoc)
@@ -1490,7 +1486,7 @@
public ClientSessionFactory connect() throws HornetQException
{
- if (closed)
+ if (closed || closing)
{
throw new IllegalStateException("Cannot create session factory, server locator is closed (maybe it has been garbage collected)");
}
@@ -1598,6 +1594,7 @@
connectors = new ArrayList<Connector>();
for (TransportConfiguration initialConnector : initialConnectors)
{
+ assertOpen();
ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(ServerLocatorImpl.this,
initialConnector,
callTimeout,
[View Less]
13 years, 2 months
JBoss hornetq SVN: r11589 - in trunk: tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/bridge and 1 other directory.
by do-not-reply@jboss.org
Author: borges
Date: 2011-10-25 08:32:12 -0400 (Tue, 25 Oct 2011)
New Revision: 11589
Modified:
trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/MessageServiceManager.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java
Log:
warnings
Modified: trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/MessageServiceManager.java
===================================================================
--- trunk/…
[View More]hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/MessageServiceManager.java 2011-10-25 12:31:51 UTC (rev 11588)
+++ trunk/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/MessageServiceManager.java 2011-10-25 12:32:12 UTC (rev 11589)
@@ -1,11 +1,18 @@
package org.hornetq.rest;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.net.URL;
+import java.util.HashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import javax.xml.bind.JAXBContext;
+
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.ServerLocator;
-import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.client.impl.ServerLocatorImpl;
-import org.hornetq.core.registry.JndiBindingRegistry;
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
import org.hornetq.core.remoting.impl.invm.TransportConstants;
import org.hornetq.rest.queue.DestinationSettings;
@@ -17,14 +24,6 @@
import org.hornetq.rest.util.TimeoutTask;
import org.hornetq.spi.core.naming.BindingRegistry;
-import javax.xml.bind.JAXBContext;
-import java.io.InputStreamReader;
-import java.io.Reader;
-import java.net.URL;
-import java.util.HashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
/**
* @author <a href="mailto:bill@burkecentral.com">Bill Burke</a>
* @version $Revision: 1 $
@@ -142,17 +141,17 @@
HashMap<String, Object> transportConfig = new HashMap<String, Object>();
transportConfig.put(TransportConstants.SERVER_ID_PROP_NAME, configuration.getInVmId());
-
-
+
+
ServerLocator consumerLocator = new ServerLocatorImpl(false, new TransportConfiguration(InVMConnectorFactory.class.getName(), transportConfig));
-
+
if (configuration.getConsumerWindowSize() != -1)
{
consumerLocator.setConsumerWindowSize(configuration.getConsumerWindowSize());
}
ClientSessionFactory consumerSessionFactory = consumerLocator.createSessionFactory();
-
+
ServerLocator defaultLocator = new ServerLocatorImpl(false, new TransportConfiguration(InVMConnectorFactory.class.getName(), transportConfig));
ClientSessionFactory sessionFactory = defaultLocator.createSessionFactory();
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java 2011-10-25 12:31:51 UTC (rev 11588)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java 2011-10-25 12:32:12 UTC (rev 11589)
@@ -235,8 +235,6 @@
Map<String, Object> server2Params = new HashMap<String, Object>();
HornetQServer service2 = createBackupHornetQServer(2, server2Params, isNetty(), 0, nodeManager);
- TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params, "server0tc");
-
Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
TransportConfiguration server1tc = new TransportConfiguration(getConnector(), server1Params, "server1tc");
[View Less]
13 years, 2 months
JBoss hornetq SVN: r11588 - in trunk/hornetq-core/src: test/java/org/hornetq/core and 2 other directories.
by do-not-reply@jboss.org
Author: borges
Date: 2011-10-25 08:31:51 -0400 (Tue, 25 Oct 2011)
New Revision: 11588
Added:
trunk/hornetq-core/src/test/java/org/hornetq/core/server/
trunk/hornetq-core/src/test/java/org/hornetq/core/server/impl/
trunk/hornetq-core/src/test/java/org/hornetq/core/server/impl/InVMNodeManager.java
Removed:
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/InVMNodeManager.java
Log:
Move test class into test sources
Deleted: trunk/hornetq-core/src/main/java/org/hornetq/…
[View More]core/server/impl/InVMNodeManager.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/InVMNodeManager.java 2011-10-25 12:31:33 UTC (rev 11587)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/InVMNodeManager.java 2011-10-25 12:31:51 UTC (rev 11588)
@@ -1,150 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.server.impl;
-
-import static org.hornetq.core.server.impl.InVMNodeManager.State.FAILING_BACK;
-import static org.hornetq.core.server.impl.InVMNodeManager.State.LIVE;
-import static org.hornetq.core.server.impl.InVMNodeManager.State.NOT_STARTED;
-import static org.hornetq.core.server.impl.InVMNodeManager.State.PAUSED;
-
-import java.util.concurrent.Semaphore;
-
-import org.hornetq.api.core.SimpleString;
-import org.hornetq.core.server.NodeManager;
-import org.hornetq.utils.UUIDGenerator;
-
-/**
- * This is a _mock_ NodeManager and is used only in tests.
- * <p>
- * It allows writing tests without the need to spawn a new JVM.
- *
- * @author <a href="mailto:andy.taylor@jboss.com">Andy Taylor</a> Date: Oct 13, 2010 Time: 3:55:47
- * PM
- */
-public class InVMNodeManager extends NodeManager
-{
-
- private final Semaphore liveLock;
-
- private final Semaphore backupLock;
-
- public enum State {LIVE, PAUSED, FAILING_BACK, NOT_STARTED}
-
- public State state = NOT_STARTED;
-
- public InVMNodeManager()
- {
- liveLock = new Semaphore(1);
- backupLock = new Semaphore(1);
- uuid = UUIDGenerator.getInstance().generateUUID();
- nodeID = new SimpleString(uuid.toString());
- }
-
- @Override
- public void awaitLiveNode() throws Exception
- {
- do
- {
- while (state == NOT_STARTED)
- {
- Thread.sleep(2000);
- }
-
- liveLock.acquire();
-
- if (state == PAUSED)
- {
- liveLock.release();
- Thread.sleep(2000);
- }
- else if (state == FAILING_BACK)
- {
- liveLock.release();
- Thread.sleep(2000);
- }
- else if (state == LIVE)
- {
- break;
- }
- }
- while (true);
- }
-
- @Override
- public void startBackup() throws Exception
- {
- backupLock.acquire();
- }
-
- @Override
- public void startLiveNode() throws Exception
- {
- state = FAILING_BACK;
- liveLock.acquire();
- state = LIVE;
- }
-
- @Override
- public void pauseLiveServer() throws Exception
- {
- state = PAUSED;
- liveLock.release();
- }
-
- @Override
- public void crashLiveServer() throws Exception
- {
- //overkill as already set to live
- state = LIVE;
- liveLock.release();
- }
-
- @Override
- public void stopBackup() throws Exception
- {
- backupLock.release();
- }
-
- @Override
- public void releaseBackup()
- {
- releaseBackupNode();
- }
-
- @Override
- public boolean isAwaitingFailback() throws Exception
- {
- return state == FAILING_BACK;
- }
-
- @Override
- public boolean isBackupLive() throws Exception
- {
- return liveLock.availablePermits() == 0;
- }
-
- @Override
- public void interrupt()
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- private void releaseBackupNode()
- {
- if(backupLock != null)
- {
- backupLock.release();
- }
- }
-}
Copied: trunk/hornetq-core/src/test/java/org/hornetq/core/server/impl/InVMNodeManager.java (from rev 11587, trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/InVMNodeManager.java)
===================================================================
--- trunk/hornetq-core/src/test/java/org/hornetq/core/server/impl/InVMNodeManager.java (rev 0)
+++ trunk/hornetq-core/src/test/java/org/hornetq/core/server/impl/InVMNodeManager.java 2011-10-25 12:31:51 UTC (rev 11588)
@@ -0,0 +1,150 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.server.impl;
+
+import static org.hornetq.core.server.impl.InVMNodeManager.State.FAILING_BACK;
+import static org.hornetq.core.server.impl.InVMNodeManager.State.LIVE;
+import static org.hornetq.core.server.impl.InVMNodeManager.State.NOT_STARTED;
+import static org.hornetq.core.server.impl.InVMNodeManager.State.PAUSED;
+
+import java.util.concurrent.Semaphore;
+
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.server.NodeManager;
+import org.hornetq.utils.UUIDGenerator;
+
+/**
+ * This is a _mock_ NodeManager and is used only in tests.
+ * <p>
+ * It allows writing tests without the need to spawn a new JVM.
+ *
+ * @author <a href="mailto:andy.taylor@jboss.com">Andy Taylor</a> Date: Oct 13, 2010 Time: 3:55:47
+ * PM
+ */
+public class InVMNodeManager extends NodeManager
+{
+
+ private final Semaphore liveLock;
+
+ private final Semaphore backupLock;
+
+ public enum State {LIVE, PAUSED, FAILING_BACK, NOT_STARTED}
+
+ public State state = NOT_STARTED;
+
+ public InVMNodeManager()
+ {
+ liveLock = new Semaphore(1);
+ backupLock = new Semaphore(1);
+ uuid = UUIDGenerator.getInstance().generateUUID();
+ nodeID = new SimpleString(uuid.toString());
+ }
+
+ @Override
+ public void awaitLiveNode() throws Exception
+ {
+ do
+ {
+ while (state == NOT_STARTED)
+ {
+ Thread.sleep(2000);
+ }
+
+ liveLock.acquire();
+
+ if (state == PAUSED)
+ {
+ liveLock.release();
+ Thread.sleep(2000);
+ }
+ else if (state == FAILING_BACK)
+ {
+ liveLock.release();
+ Thread.sleep(2000);
+ }
+ else if (state == LIVE)
+ {
+ break;
+ }
+ }
+ while (true);
+ }
+
+ @Override
+ public void startBackup() throws Exception
+ {
+ backupLock.acquire();
+ }
+
+ @Override
+ public void startLiveNode() throws Exception
+ {
+ state = FAILING_BACK;
+ liveLock.acquire();
+ state = LIVE;
+ }
+
+ @Override
+ public void pauseLiveServer() throws Exception
+ {
+ state = PAUSED;
+ liveLock.release();
+ }
+
+ @Override
+ public void crashLiveServer() throws Exception
+ {
+ //overkill as already set to live
+ state = LIVE;
+ liveLock.release();
+ }
+
+ @Override
+ public void stopBackup() throws Exception
+ {
+ backupLock.release();
+ }
+
+ @Override
+ public void releaseBackup()
+ {
+ releaseBackupNode();
+ }
+
+ @Override
+ public boolean isAwaitingFailback() throws Exception
+ {
+ return state == FAILING_BACK;
+ }
+
+ @Override
+ public boolean isBackupLive() throws Exception
+ {
+ return liveLock.availablePermits() == 0;
+ }
+
+ @Override
+ public void interrupt()
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ private void releaseBackupNode()
+ {
+ if(backupLock != null)
+ {
+ backupLock.release();
+ }
+ }
+}
[View Less]
13 years, 2 months
JBoss hornetq SVN: r11587 - trunk.
by do-not-reply@jboss.org
Author: borges
Date: 2011-10-25 08:31:33 -0400 (Tue, 25 Oct 2011)
New Revision: 11587
Modified:
trunk/pom.xml
Log:
Upgrade surefire plugin to 2.10
Modified: trunk/pom.xml
===================================================================
--- trunk/pom.xml 2011-10-25 12:31:25 UTC (rev 11586)
+++ trunk/pom.xml 2011-10-25 12:31:33 UTC (rev 11587)
@@ -470,7 +470,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<…
[View More]artifactId>maven-surefire-plugin</artifactId>
- <version>2.8.1</version>
+ <version>2.10</version>
<configuration>
<testFailureIgnore>true</testFailureIgnore>
<runOrder>alphabetical</runOrder>
@@ -481,7 +481,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-report-plugin</artifactId>
- <version>2.8.1</version>
+ <version>2.10</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
[View Less]
13 years, 2 months
JBoss hornetq SVN: r11586 - in trunk: tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution and 1 other directory.
by do-not-reply@jboss.org
Author: borges
Date: 2011-10-25 08:31:25 -0400 (Tue, 25 Oct 2011)
New Revision: 11586
Modified:
trunk/hornetq-core/src/test/java/org/hornetq/tests/util/UnitTestCase.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
Log:
Fix configuration of replicated backups (Quorum test set-up), clean up code
Modified: trunk/hornetq-core/src/test/java/org/hornetq/tests/util/UnitTestCase.java
=========================================…
[View More]==========================
--- trunk/hornetq-core/src/test/java/org/hornetq/tests/util/UnitTestCase.java 2011-10-25 12:31:02 UTC (rev 11585)
+++ trunk/hornetq-core/src/test/java/org/hornetq/tests/util/UnitTestCase.java 2011-10-25 12:31:25 UTC (rev 11586)
@@ -189,13 +189,15 @@
{
ConfigurationImpl configuration = new ConfigurationImpl();
configuration.setSecurityEnabled(false);
- configuration.setBindingsDirectory(getBindingsDir(serverID, false));
configuration.setJournalMinFiles(2);
- configuration.setJournalDirectory(getJournalDir(serverID, false));
configuration.setJournalFileSize(100 * 1024);
configuration.setJournalType(getDefaultJournalType());
+
+ configuration.setJournalDirectory(getJournalDir(serverID, false));
+ configuration.setBindingsDirectory(getBindingsDir(serverID, false));
configuration.setPagingDirectory(getPageDir(serverID, false));
configuration.setLargeMessagesDirectory(getLargeMessagesDir(serverID, false));
+
configuration.setJournalCompactMinFiles(0);
configuration.setJournalCompactPercentage(0);
configuration.setClusterPassword(CLUSTER_PASSWORD);
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-10-25 12:31:02 UTC (rev 11585)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-10-25 12:31:25 UTC (rev 11586)
@@ -1508,36 +1508,16 @@
throw new IllegalArgumentException("Already a server at node " + node);
}
- Configuration configuration = createBasicConfig();
+ Configuration configuration = createBasicConfig(node);
- configuration.setSecurityEnabled(false);
- configuration.setJournalMinFiles(2);
configuration.setJournalMaxIO_AIO(1000);
- configuration.setJournalFileSize(100 * 1024);
- configuration.setJournalType(getDefaultJournalType());
configuration.setSharedStore(sharedStorage);
configuration.setThreadPoolMaxSize(10);
- if (sharedStorage)
- {
- // Shared storage will share the node between the backup and live node
- configuration.setBindingsDirectory(getBindingsDir(node, false));
- configuration.setJournalDirectory(getJournalDir(node, false));
- configuration.setPagingDirectory(getPageDir(node, false));
- configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, false));
- }
- else
- {
- configuration.setBindingsDirectory(getBindingsDir(node, true));
- configuration.setJournalDirectory(getJournalDir(node, true));
- configuration.setPagingDirectory(getPageDir(node, true));
- configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, true));
- }
configuration.setClustered(true);
- configuration.setJournalCompactMinFiles(0);
configuration.getAcceptorConfigurations().clear();
- configuration.getAcceptorConfigurations().add(createTransportConfiguration(netty, true, generateParams(node,
- netty)));
+ configuration.getAcceptorConfigurations().add(createTransportConfiguration(netty, true,
+ generateParams(node, netty)));
HornetQServer server;
@@ -1550,7 +1530,6 @@
else
{
server = HornetQServers.newHornetQServer(configuration);
- server.setIdentity("Server " + node);
}
}
else
@@ -1562,7 +1541,6 @@
else
{
server = HornetQServers.newHornetQServer(configuration, false);
- server.setIdentity("Server " + node);
}
}
@@ -1570,32 +1548,6 @@
servers[node] = server;
}
- protected void setupLiveServer(final int node,
- final boolean fileStorage,
- final boolean sharedStorage,
- final boolean netty,
- NodeManager nodeManager)
- {
- if (servers[node] != null)
- {
- throw new IllegalArgumentException("Already a server at node " + node);
- }
-
- Configuration configuration = createBasicConfig();
-
- configureSomeCommonValues(node, node, sharedStorage, configuration);
- configuration.setJournalCompactMinFiles(0);
-
- configuration.getAcceptorConfigurations().clear();
- configuration.getAcceptorConfigurations().add(createTransportConfiguration(netty, true, generateParams(node, netty)));
-
- HornetQServer server;
-
- server = createInVMFailoverServer(fileStorage, configuration, nodeManager,node);
-
- servers[node] = server;
- }
-
protected void setupBackupServer(final int node,
final int liveNode,
final boolean fileStorage,
@@ -1607,145 +1559,40 @@
throw new IllegalArgumentException("Already a server at node " + node);
}
- Configuration configuration = createBasicConfig();
+ Configuration configuration = createBasicConfig(sharedStorage ? liveNode : node);
- configuration.setSecurityEnabled(false);
- configuration.setJournalMinFiles(2);
- configuration.setJournalMaxIO_AIO(1000);
- configuration.setJournalFileSize(100 * 1024);
- configuration.setJournalType(getDefaultJournalType());
configuration.setSharedStore(sharedStorage);
- if (sharedStorage)
- {
- // Shared storage will share the node between the backup and live node
- configuration.setBindingsDirectory(getBindingsDir(liveNode, false));
- configuration.setJournalDirectory(getJournalDir(liveNode, false));
- configuration.setPagingDirectory(getPageDir(liveNode, false));
- configuration.setLargeMessagesDirectory(getLargeMessagesDir(liveNode, false));
- }
- else
- {
- configuration.setBindingsDirectory(getBindingsDir(node, true));
- configuration.setJournalDirectory(getJournalDir(node, true));
- configuration.setPagingDirectory(getPageDir(node, true));
- configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, true));
- }
configuration.setClustered(true);
- configuration.setJournalCompactMinFiles(0);
configuration.setBackup(true);
configuration.getAcceptorConfigurations().clear();
+
TransportConfiguration acceptorConfig = createTransportConfiguration(netty, true, generateParams(node, netty));
configuration.getAcceptorConfigurations().add(acceptorConfig);
+
// add backup connector
TransportConfiguration liveConfig = createTransportConfiguration(netty, false, generateParams(liveNode, netty));
configuration.getConnectorConfigurations().put(liveConfig.getName(), liveConfig);
TransportConfiguration backupConfig = createTransportConfiguration(netty, false, generateParams(node, netty));
configuration.getConnectorConfigurations().put(backupConfig.getName(), backupConfig);
+ configuration.setLiveConnectorName(liveConfig.getName());
+
HornetQServer server;
- if (fileStorage)
+ if (sharedStorage)
{
- if (sharedStorage)
- {
- server = createInVMFailoverServer(true, configuration, nodeManagers[liveNode], liveNode);
- }
- else
- {
- server = HornetQServers.newHornetQServer(configuration);
- server.setIdentity("Server " + liveNode);
- }
+ server = createInVMFailoverServer(true, configuration, nodeManagers[liveNode], liveNode);
}
else
{
- if (sharedStorage)
- {
- server = createInVMFailoverServer(true, configuration, nodeManagers[liveNode], liveNode);
- }
- else
- {
- server = HornetQServers.newHornetQServer(configuration, false);
- server.setIdentity("Server " + liveNode);
- }
+ boolean enablePersistency = fileStorage ? true : configuration.isPersistenceEnabled();
+ server = HornetQServers.newHornetQServer(configuration, enablePersistency);
}
server.setIdentity(this.getClass().getSimpleName() + "/Backup(" + node + " of live " + liveNode + ")");
servers[node] = server;
}
- private void configureSomeCommonValues(final int node,
- final int liveNode,
- final boolean sharedStorage,
- Configuration configuration)
- {
- configuration.setSecurityEnabled(false);
- configuration.setJournalMinFiles(2);
- configuration.setJournalMaxIO_AIO(1000);
- configuration.setJournalFileSize(100 * 1024);
- configuration.setJournalType(getDefaultJournalType());
-
- configureCommonValues(node, liveNode, sharedStorage, configuration);
- }
-
- private void configureCommonValues(final int node,
- final int liveNode,
- final boolean sharedStorage,
- Configuration configuration)
- {
- configuration.setSharedStore(sharedStorage);
- configuration.setClustered(true);
- if (sharedStorage)
- {
- // Shared storage will share the node between the backup and live node
- configuration.setBindingsDirectory(getBindingsDir(liveNode, false));
- configuration.setJournalDirectory(getJournalDir(liveNode, false));
- configuration.setPagingDirectory(getPageDir(liveNode, false));
- configuration.setLargeMessagesDirectory(getLargeMessagesDir(liveNode, false));
- }
- else
- {
- configuration.setBindingsDirectory(getBindingsDir(node, true));
- configuration.setJournalDirectory(getJournalDir(node, true));
- configuration.setPagingDirectory(getPageDir(node, true));
- configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, true));
- }
- }
-
- protected void setupBackupServer(final int node,
- final int liveNode,
- final boolean fileStorage,
- final boolean sharedStorage,
- final boolean netty,
- NodeManager nodeManager)
- {
- if (servers[node] != null)
- {
- throw new IllegalArgumentException("Already a server at node " + node);
- }
-
- Configuration configuration = createBasicConfig();
-
- configureSomeCommonValues(node, liveNode, sharedStorage, configuration);
- configuration.setJournalCompactMinFiles(0);
- configuration.setBackup(true);
-
- // add acceptor
- configuration.getAcceptorConfigurations().clear();
- TransportConfiguration acceptorConfig = createTransportConfiguration(netty, true, generateParams(node, netty));
- configuration.getAcceptorConfigurations().add(acceptorConfig);
-
-
- // add backup connector
- TransportConfiguration liveConfig = createTransportConfiguration(netty, false, generateParams(liveNode, netty));
- configuration.getConnectorConfigurations().put(liveConfig.getName(), liveConfig);
- TransportConfiguration backupConfig = createTransportConfiguration(netty, false, generateParams(node, netty));
- configuration.getConnectorConfigurations().put(backupConfig.getName(), backupConfig);
-
- configuration.setLiveConnectorName(liveConfig.getName());
-
- servers[node] = createInVMFailoverServer(fileStorage, configuration, nodeManager, node);
- }
-
protected void setupLiveServerWithDiscovery(final int node,
final String groupAddress,
final int port,
@@ -1758,17 +1605,9 @@
throw new IllegalArgumentException("Already a server at node " + node);
}
- Configuration configuration = createBasicConfig();
+ Configuration configuration = createBasicConfig(node);
- configuration.setSecurityEnabled(false);
- configuration.setBindingsDirectory(getBindingsDir(node, false));
- configuration.setJournalMinFiles(2);
- configuration.setJournalDirectory(getJournalDir(node, false));
- configuration.setJournalFileSize(100 * 1024);
- configuration.setJournalType(getDefaultJournalType());
configuration.setJournalMaxIO_AIO(1000);
- configuration.setPagingDirectory(getPageDir(node, false));
- configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, false));
configuration.setClustered(true);
configuration.setBackup(false);
@@ -1844,25 +1683,9 @@
throw new IllegalArgumentException("Already a server at node " + node);
}
- Configuration configuration = createBasicConfig();
+ Configuration configuration = createBasicConfig(sharedStorage ? liveNode : node);
- configuration.setSecurityEnabled(false);
configuration.setSharedStore(sharedStorage);
- if (sharedStorage)
- {
- // Shared storage will share the node between the backup and live node
- configuration.setBindingsDirectory(getBindingsDir(liveNode, false));
- configuration.setJournalDirectory(getJournalDir(liveNode, false));
- configuration.setPagingDirectory(getPageDir(liveNode, false));
- configuration.setLargeMessagesDirectory(getLargeMessagesDir(liveNode, false));
- }
- else
- {
- configuration.setBindingsDirectory(getBindingsDir(node, true));
- configuration.setJournalDirectory(getJournalDir(node, true));
- configuration.setPagingDirectory(getPageDir(node, true));
- configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, true));
- }
configuration.setClustered(true);
configuration.setBackup(true);
@@ -1898,28 +1721,14 @@
configuration.getDiscoveryGroupConfigurations().put(dcConfig.getName(), dcConfig);
HornetQServer server;
- if (fileStorage)
+ if (sharedStorage)
{
- if (sharedStorage)
- {
- server = createInVMFailoverServer(true, configuration, nodeManagers[liveNode], liveNode);
- }
- else
- {
- server = HornetQServers.newHornetQServer(configuration);
- server.setIdentity("Server " + liveNode);
- }
+ server = createInVMFailoverServer(fileStorage, configuration, nodeManagers[liveNode], liveNode);
}
else
{
- if (sharedStorage)
- {
- server = createInVMFailoverServer(false, configuration, nodeManagers[liveNode], liveNode);
- }
- else
- {
- server = HornetQServers.newHornetQServer(configuration, false);
- }
+ boolean enablePersistency = fileStorage ? configuration.isPersistenceEnabled() : false;
+ server = HornetQServers.newHornetQServer(configuration, enablePersistency);
}
servers[node] = server;
}
[View Less]
13 years, 2 months
JBoss hornetq SVN: r11585 - in trunk: tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution and 1 other directory.
by do-not-reply@jboss.org
Author: borges
Date: 2011-10-25 08:31:02 -0400 (Tue, 25 Oct 2011)
New Revision: 11585
Modified:
trunk/hornetq-core/src/test/java/org/hornetq/tests/util/ServiceTestBase.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
Log:
First start all servers, and then wait for isStarted()
Modified: trunk/hornetq-core/src/test/java/org/hornetq/tests/util/ServiceTestBase.java
========================================================…
[View More]===========
--- trunk/hornetq-core/src/test/java/org/hornetq/tests/util/ServiceTestBase.java 2011-10-25 12:30:43 UTC (rev 11584)
+++ trunk/hornetq-core/src/test/java/org/hornetq/tests/util/ServiceTestBase.java 2011-10-25 12:31:02 UTC (rev 11585)
@@ -262,22 +262,22 @@
{
if (acceptor)
{
- className = NettyAcceptorFactory.class.getName();
+ className = NETTY_ACCEPTOR_FACTORY;
}
else
{
- className = NettyConnectorFactory.class.getName();
+ className = NETTY_CONNECTOR_FACTORY;
}
}
else
{
if (acceptor)
{
- className = InVMAcceptorFactory.class.getName();
+ className = INVM_ACCEPTOR_FACTORY;
}
else
{
- className = InVMConnectorFactory.class.getName();
+ className = INVM_CONNECTOR_FACTORY;
}
}
return new TransportConfiguration(className, params);
@@ -309,13 +309,13 @@
long timetowait = System.currentTimeMillis() + 5000;
while (!server.isStarted() && System.currentTimeMillis() < timetowait)
{
- Thread.sleep(100);
+ Thread.sleep(50);
}
if (!server.isStarted())
{
log.info(threadDump("Server didn't start"));
- fail("server didnt start");
+ fail("server didnt start: " + server);
}
if (!server.getConfiguration().isBackup())
@@ -323,12 +323,12 @@
timetowait = System.currentTimeMillis() + 5000;
while (!server.isInitialised() && System.currentTimeMillis() < timetowait)
{
- Thread.sleep(100);
+ Thread.sleep(50);
}
if (!server.isInitialised())
{
- fail("Server didn't initialize");
+ fail("Server didn't initialize: " + server);
}
}
}
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-10-25 12:30:43 UTC (rev 11584)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-10-25 12:31:02 UTC (rev 11585)
@@ -2189,9 +2189,11 @@
log.info("started server " + servers[node]);
log.info("started server " + node);
+ }
+ for (int node : nodes)
+ {
waitForServer(servers[node]);
-
}
}
[View Less]
13 years, 2 months