[jboss-cvs] JBossAS SVN: r105665 - projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/framework/server.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Jun 3 12:21:57 EDT 2010
Author: bstansberry at jboss.com
Date: 2010-06-03 12:21:56 -0400 (Thu, 03 Jun 2010)
New Revision: 105665
Added:
projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/framework/server/ChannelSource.java
Modified:
projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/framework/server/CoreGroupCommunicationService.java
projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/framework/server/HAPartitionImpl.java
Log:
[JBCLUSTER-276] Fix state transfer bugs found in integration testing
Improve how the Cache-created Channel is integrated
Added: projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/framework/server/ChannelSource.java
===================================================================
--- projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/framework/server/ChannelSource.java (rev 0)
+++ projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/framework/server/ChannelSource.java 2010-06-03 16:21:56 UTC (rev 105665)
@@ -0,0 +1,35 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2010, Red Hat, Inc., and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.ha.core.framework.server;
+
+import org.jgroups.Channel;
+
+/**
+ * An object that provides a reference to a JGroups Channel.
+ *
+ * @author Brian Stansberry
+ * @version $Revision$
+ */
+public interface ChannelSource
+{
+ Channel getChannel();
+}
Modified: projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/framework/server/CoreGroupCommunicationService.java
===================================================================
--- projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/framework/server/CoreGroupCommunicationService.java 2010-06-03 16:21:31 UTC (rev 105664)
+++ projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/framework/server/CoreGroupCommunicationService.java 2010-06-03 16:21:56 UTC (rev 105665)
@@ -150,9 +150,11 @@
// Attributes ----------------------------------------------------
@SuppressWarnings("deprecation")
- private org.jgroups.ChannelFactory channelFactory;
- private String stackName;
- private String groupName;
+ private org.jgroups.ChannelFactory channelFactory;
+ private ChannelSource channelSource;
+ private boolean channelInjected = true;
+ private String stackName;
+ private String groupName;
private boolean channelSelfConnected;
@@ -785,27 +787,28 @@
{
RunnableFuture<SerializableStateTransferResult> future = null;
StateTransferTask<?, ?> task = stateTransferTasks.get(serviceName);
- if (task == null)
+ if (task == null || (task.result != null && !task.result.stateReceived()))
{
SerializableStateTransferTask newTask = new SerializableStateTransferTask(serviceName, classloader);
stateTransferTasks.put(serviceName, newTask);
future = new FutureTask<SerializableStateTransferResult>(newTask);
- Executor e = getThreadPool();
- if (e == null)
- {
- e = Executors.newSingleThreadExecutor();
- }
- e.execute(future);
}
else if (task instanceof SerializableStateTransferTask)
{
// Unlikely scenario
+ log.warn("Received concurrent requests to get service state for " + serviceName);
future = new FutureTask<SerializableStateTransferResult>((SerializableStateTransferTask) task);
}
else
{
throw new IllegalStateException("State transfer task for " + serviceName + " that will return an input stream is already pending");
}
+ Executor e = getThreadPool();
+ if (e == null)
+ {
+ e = Executors.newSingleThreadExecutor();
+ }
+ e.execute(future);
return future;
}
@@ -818,27 +821,28 @@
{
RunnableFuture<StreamStateTransferResult> future = null;
StateTransferTask<?, ?> task = stateTransferTasks.get(serviceName);
- if (task == null)
+ if (task == null || (task.result != null && !task.result.stateReceived()))
{
StreamStateTransferTask newTask = new StreamStateTransferTask(serviceName);
stateTransferTasks.put(serviceName, newTask);
future = new FutureTask<StreamStateTransferResult>(newTask);
- Executor e = getThreadPool();
- if (e == null)
- {
- e = Executors.newSingleThreadExecutor();
- }
- e.execute(future);
}
else if (task instanceof StreamStateTransferTask)
{
// Unlikely scenario
+ log.warn("Received concurrent requests to get service state for " + serviceName);
future = new FutureTask<StreamStateTransferResult>((StreamStateTransferTask) task);
}
else
{
throw new IllegalStateException("State transfer task for " + serviceName + " that will return an deserialized object is already pending");
}
+ Executor e = getThreadPool();
+ if (e == null)
+ {
+ e = Executors.newSingleThreadExecutor();
+ }
+ e.execute(future);
return future;
}
@@ -916,6 +920,16 @@
{
return Version.description + "( " + Version.cvs + ")";
}
+
+ public ChannelSource getChannelSource()
+ {
+ return this.channelSource;
+ }
+
+ public void setChannelSource(ChannelSource source)
+ {
+ this.channelSource = source;
+ }
@SuppressWarnings("deprecation")
public org.jgroups.ChannelFactory getChannelFactory()
@@ -1021,10 +1035,8 @@
state = STOPPING;
try
{
- this.log.info("Stopping partition " + this.getGroupName());
stopService();
state = STOPPED;
- this.log.info("Partition " + this.getGroupName() + " stopped.");
}
catch (InterruptedException e)
{
@@ -1098,6 +1110,14 @@
this.stateIdPrefix = getClass().getName() + "." + this.scopeId + ".";
+ if (this.channel == null)
+ {
+ this.channelInjected = false;
+ if (this.channelSource != null)
+ {
+ this.channel = this.channelSource.getChannel();
+ }
+ }
if (this.channel == null || !this.channel.isOpen())
{
@@ -1106,6 +1126,7 @@
this.channel = this.createChannel();
}
+
// Subscribe to events generated by the channel
MembershipListener meml = new MembershipListenerImpl();
MessageListener msgl = this.stateIdPrefix == null ? null : new MessageListenerImpl();
@@ -1158,29 +1179,28 @@
{
this.channelSelfConnected = false;
this.channel.disconnect();
+ this.channel.close();
}
}
catch (Exception e)
{
this.log.error("channel disconnection failed", e);
}
- }
-
- protected void destroyService()
- {
- try
+ finally
{
- if (this.channelSelfConnected && this.channel != null && this.channel.isOpen())
+ if (!this.channelInjected)
{
- this.channel.close();
+ // Recreate the channel next time
+ this.channel = null;
}
}
- catch (Exception e)
- {
- this.log.error("Closing channel failed", e);
- }
}
+ protected void destroyService()
+ {
+ // no-op
+ }
+
@SuppressWarnings("deprecation")
protected Channel createChannel()
{
@@ -2361,6 +2381,8 @@
V state;
private boolean isStateSet;
private Exception setStateException;
+ private T result;
+ private final Object callMutex = new Object();
StateTransferTask(String serviceName)
{
@@ -2369,82 +2391,91 @@
public T call() throws Exception
{
- boolean intr = false;
- boolean rc = false;
- try
+ synchronized (callMutex)
{
- long start, stop;
- this.isStateSet = false;
- start = System.currentTimeMillis();
- String state_id = CoreGroupCommunicationService.this.stateIdPrefix + serviceName;
- rc = CoreGroupCommunicationService.this.getChannel().getState(null, state_id, CoreGroupCommunicationService.this.getStateTransferTimeout());
- if (rc)
+ if (result != null)
{
- synchronized (this)
+ return result;
+ }
+
+ boolean intr = false;
+ boolean rc = false;
+ try
+ {
+ long start, stop;
+ this.isStateSet = false;
+ start = System.currentTimeMillis();
+ String state_id = CoreGroupCommunicationService.this.stateIdPrefix + serviceName;
+ rc = CoreGroupCommunicationService.this.getChannel().getState(null, state_id, CoreGroupCommunicationService.this.getStateTransferTimeout());
+ if (rc)
{
- while (!this.isStateSet)
+ synchronized (this)
{
- if (this.setStateException != null)
+ while (!this.isStateSet)
{
- throw this.setStateException;
+ if (this.setStateException != null)
+ {
+ throw this.setStateException;
+ }
+
+ try
+ {
+ wait();
+ }
+ catch (InterruptedException iex)
+ {
+ intr = true;
+ }
}
-
- try
- {
- wait();
- }
- catch (InterruptedException iex)
- {
- intr = true;
- }
}
+ stop = System.currentTimeMillis();
+ CoreGroupCommunicationService.this.log.debug("serviceState was retrieved successfully (in " + (stop - start) + " milliseconds)");
}
- stop = System.currentTimeMillis();
- CoreGroupCommunicationService.this.log.debug("serviceState was retrieved successfully (in " + (stop - start) + " milliseconds)");
- }
- else
- {
- // No one provided us with serviceState.
- // We need to find out if we are the coordinator, so we must
- // block until viewAccepted() is called at least once
-
- synchronized (CoreGroupCommunicationService.this.channelLock)
+ else
{
- while (CoreGroupCommunicationService.this.getCurrentView().size() == 0)
+ // No one provided us with serviceState.
+ // We need to find out if we are the coordinator, so we must
+ // block until viewAccepted() is called at least once
+
+ synchronized (CoreGroupCommunicationService.this.channelLock)
{
- CoreGroupCommunicationService.this.log.debug("waiting on viewAccepted()");
- try
+ while (CoreGroupCommunicationService.this.getCurrentView().size() == 0)
{
- CoreGroupCommunicationService.this.channelLock.wait();
+ CoreGroupCommunicationService.this.log.debug("waiting on viewAccepted()");
+ try
+ {
+ CoreGroupCommunicationService.this.channelLock.wait();
+ }
+ catch (InterruptedException iex)
+ {
+ intr = true;
+ }
}
- catch (InterruptedException iex)
- {
- intr = true;
- }
}
+
+ if (CoreGroupCommunicationService.this.isCurrentNodeCoordinator())
+ {
+ CoreGroupCommunicationService.this.log.debug("State could not be retrieved for service " + serviceName + " (we are the first member in group)");
+ }
+ else
+ {
+ throw new IllegalStateException("Initial serviceState transfer failed: " +
+ "Channel.getState() returned false");
+ }
}
-
- if (CoreGroupCommunicationService.this.isCurrentNodeCoordinator())
- {
- CoreGroupCommunicationService.this.log.debug("State could not be retrieved for service " + serviceName + " (we are the first member in group)");
- }
- else
- {
- throw new IllegalStateException("Initial serviceState transfer failed: " +
- "Channel.getState() returned false");
- }
+
+ result = createStateTransferResult(rc, state, null);
}
+ catch (Exception e)
+ {
+ result = createStateTransferResult(rc, null, e);
+ }
+ finally
+ {
+ if (intr) Thread.currentThread().interrupt();
+ }
+ return result;
}
- catch (Exception e)
- {
- return createStateTransferResult(rc, null, e);
- }
- finally
- {
- if (intr) Thread.currentThread().interrupt();
- }
-
- return createStateTransferResult(rc, state, null);
}
protected abstract T createStateTransferResult(boolean gotState, V state, Exception exception);
Modified: projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/framework/server/HAPartitionImpl.java
===================================================================
--- projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/framework/server/HAPartitionImpl.java 2010-06-03 16:21:31 UTC (rev 105664)
+++ projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/framework/server/HAPartitionImpl.java 2010-06-03 16:21:56 UTC (rev 105665)
@@ -35,7 +35,7 @@
import org.jboss.ha.framework.interfaces.HAPartition;
import org.jboss.ha.framework.interfaces.SerializableStateTransferResult;
import org.jboss.ha.framework.interfaces.StateTransferProvider;
-//import org.jboss.ha.framework.server.spi.ManagedDistributedState;
+import org.jboss.ha.framework.server.spi.ManagedDistributedState;
/**
* Extends {@link CoreGroupCommunicationService} to add implemenation of the
@@ -172,6 +172,17 @@
this.log.info("Initializing partition " + this.getPartitionName());
this.logHistory ("Initializing partition " + this.getPartitionName());
+ if (this.distributedState instanceof ManagedDistributedState)
+ {
+ ((ManagedDistributedState) this.distributedState).createService();
+ }
+
+ if (getChannelSource() == null && distributedState instanceof ChannelSource)
+ {
+ log.debug("Using " + distributedState + " as a " + ChannelSource.class.getSimpleName());
+ setChannelSource((ChannelSource) distributedState);
+ }
+
super.createService();
if (this.replicantManager == null)
{
@@ -182,11 +193,6 @@
this.replicantManager.createService();
-// if (this.distributedState instanceof ManagedDistributedState)
-// {
-// ((ManagedDistributedState) this.distributedState).createService();
-// }
-
this.log.debug("done initializing partition " + this.getPartitionName());
}
@@ -195,16 +201,18 @@
{
this.logHistory ("Starting partition " + this.getPartitionName());
+ // Start DS first, so it can start its cache which will want to
+ // do the channel connection stuff itself before we try
+ if (this.distributedState instanceof ManagedDistributedState)
+ {
+ ((ManagedDistributedState) this.distributedState).startService();
+ }
+
super.startService();
this.fetchInitialState();
this.replicantManager.startService();
-
-// if (this.distributedState instanceof ManagedDistributedState)
-// {
-// ((ManagedDistributedState) this.distributedState).startService();
-// }
}
@Override
@@ -212,15 +220,15 @@
{
this.logHistory ("Stopping partition");
this.log.info("Stopping partition " + this.getPartitionName());
-
-// if (this.distributedState instanceof ManagedDistributedState)
-// {
-// ((ManagedDistributedState) this.distributedState).stopService();
-// }
this.replicantManager.stopService();
super.stopService();
+
+ if (this.distributedState instanceof ManagedDistributedState)
+ {
+ ((ManagedDistributedState) this.distributedState).stopService();
+ }
this.log.info("Partition " + this.getPartitionName() + " stopped.");
}
@@ -230,30 +238,40 @@
{
this.log.debug("Destroying HAPartition: " + this.getPartitionName());
- @SuppressWarnings("deprecation")
- String svc = org.jboss.ha.framework.interfaces.DistributedState.class.getSimpleName();
try
- {
-// if (this.distributedState instanceof ManagedDistributedState)
-// {
-// ((ManagedDistributedState) this.distributedState).destroyService();
-// }
- svc = DistributedReplicantManager.class.getSimpleName();
+ {
this.replicantManager.destroyService();
// unregisterDRM();
}
catch (InterruptedException e)
{
Thread.currentThread().interrupt();
- this.log.error("Destroying " + svc + " failed", e);
+ this.log.error("Destroying " + DistributedReplicantManager.class.getSimpleName() + " failed", e);
}
catch (Exception e)
{
- this.log.error("Destroying " + svc + " failed", e);
+ this.log.error("Destroying " + DistributedReplicantManager.class.getSimpleName() + " failed", e);
}
super.destroyService();
+ try
+ {
+ if (this.distributedState instanceof ManagedDistributedState)
+ {
+ ((ManagedDistributedState) this.distributedState).destroyService();
+ }
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ this.log.error("Destroying " + DistributedReplicantManager.class.getSimpleName() + " failed", e);
+ }
+ catch (Exception e)
+ {
+ this.log.error("Destroying " + DistributedReplicantManager.class.getSimpleName() + " failed", e);
+ }
+
this.log.info("Partition " + this.getPartitionName() + " destroyed.");
}
More information about the jboss-cvs-commits
mailing list