[jboss-cvs] JBossAS SVN: r104449 - trunk/cluster/src/main/java/org/jboss/ha/framework/server.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue May 4 13:34:17 EDT 2010
Author: bstansberry at jboss.com
Date: 2010-05-04 13:34:16 -0400 (Tue, 04 May 2010)
New Revision: 104449
Modified:
trunk/cluster/src/main/java/org/jboss/ha/framework/server/ClusterPartition.java
Log:
[JBAS-3594] First crack at implementation
Modified: trunk/cluster/src/main/java/org/jboss/ha/framework/server/ClusterPartition.java
===================================================================
--- trunk/cluster/src/main/java/org/jboss/ha/framework/server/ClusterPartition.java 2010-05-04 17:03:12 UTC (rev 104448)
+++ trunk/cluster/src/main/java/org/jboss/ha/framework/server/ClusterPartition.java 2010-05-04 17:34:16 UTC (rev 104449)
@@ -28,20 +28,26 @@
import java.io.OutputStream;
import java.io.Serializable;
import java.lang.ref.WeakReference;
+import java.security.AccessController;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
+import java.util.Hashtable;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Vector;
+import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.RunnableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
@@ -80,6 +86,8 @@
import org.jboss.managed.api.annotation.ViewUse;
import org.jboss.naming.NonSerializableFactory;
import org.jboss.system.ServiceMBeanSupport;
+import org.jboss.util.loading.ContextClassLoaderSwitcher;
+import org.jboss.util.loading.ContextClassLoaderSwitcher.SwitchContext;
import org.jgroups.Address;
import org.jgroups.Channel;
import org.jgroups.Event;
@@ -284,9 +292,14 @@
private HAPartitionDependencyCreator haPartitionDependencyCreator;
private KernelControllerContext kernelControllerContext;
+ private final Map<String, StateTransferTask> stateTransferTasks = new Hashtable<String, StateTransferTask>();
+
+ @SuppressWarnings("unchecked")
+ private final ContextClassLoaderSwitcher classLoaderSwitcher = (ContextClassLoaderSwitcher) AccessController.doPrivileged(ContextClassLoaderSwitcher.INSTANTIATOR);
+
// Static --------------------------------------------------------
- // Constructors --------------------------------------------------
+ // Constructors --------------------------------------------------
public ClusterPartition()
{
@@ -1473,7 +1486,22 @@
public Future<Serializable> getServiceState(String serviceName, ClassLoader classloader)
{
- throw new UnsupportedOperationException("Not implemented; see http://jira.jboss.com/jira/browse/JBAS-3594");
+ RunnableFuture<Serializable> future = null;
+ StateTransferTask task = stateTransferTasks.get(serviceName);
+ if (task == null)
+ {
+ task = new StateTransferTask(serviceName, classloader);
+ stateTransferTasks.put(serviceName, task);
+ future = new FutureTask<Serializable>(task);
+ Executor e = threadPool == null ? Executors.newSingleThreadExecutor() : threadPool;
+ e.execute(future);
+ }
+ else
+ {
+ // Unlikely scenario
+ future = new FutureTask<Serializable>(task);
+ }
+ return future;
}
public Future<Serializable> getServiceState(String serviceName)
@@ -2159,12 +2187,81 @@
public void getState(String state_id, OutputStream ostream)
{
- throw new UnsupportedOperationException("Not implemented; see http://jira.jboss.com/jira/browse/JBAS-3594");
+ // FIXME refactor to share logic
+ ClusterPartition.this.log.debug("getState called for service " + state_id);
+
+ StateTransferProvider provider = stateProviders.get(state_id);
+ if (provider != null)
+ {
+ MarshalledValueOutputStream mvos = null;
+ // FIXME add a streaming api to StateTransferProvider
+ Object state = provider.getCurrentState();
+ try
+ {
+ mvos = new MarshalledValueOutputStream(ostream);
+ mvos.writeObject(state);
+ }
+ catch (Exception ex)
+ {
+ ClusterPartition.this.log.error("getState failed for service " + state_id, ex);
+ }
+ finally
+ {
+ if (mvos != null)
+ {
+ try
+ {
+ mvos.flush();
+ mvos.close();
+ }
+ catch (IOException ignored)
+ {
+ log.debug("Caught exception closing stream used for marshalling state", ignored);
+ }
+ }
+ }
+ }
}
public byte[] getState(String state_id)
{
- throw new UnsupportedOperationException("Not implemented; see http://jira.jboss.com/jira/browse/JBAS-3594");
+ ClusterPartition.this.log.debug("getState called for service " + state_id);
+
+ StateTransferProvider provider = stateProviders.get(state_id);
+ if (provider != null)
+ {
+ MarshalledValueOutputStream mvos = null;
+ Object state = provider.getCurrentState();
+ try
+ {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
+ mvos = new MarshalledValueOutputStream(baos);
+ mvos.writeObject(state);
+ mvos.flush();
+ mvos.close();
+ return baos.toByteArray();
+ }
+ catch (Exception ex)
+ {
+ ClusterPartition.this.log.error("getState failed for service " + state_id, ex);
+ }
+ finally
+ {
+ if (mvos != null)
+ {
+ try
+ {
+ mvos.close();
+ }
+ catch (IOException ignored)
+ {
+ log.debug("Caught exception closing stream used for marshalling state", ignored);
+ }
+ }
+ }
+ }
+
+ return null; // This will cause the receiver to get a "false" on the channel.getState() call
}
public void setState(InputStream stream)
@@ -2214,12 +2311,30 @@
public void setState(String state_id, byte[] state)
{
- throw new UnsupportedOperationException("Not implemented; see http://jira.jboss.com/jira/browse/JBAS-3594");
+ StateTransferTask task = ClusterPartition.this.stateTransferTasks.get(state_id);
+ if (task == null)
+ {
+ ClusterPartition.this.log.warn("No " + StateTransferTask.class.getSimpleName() +
+ " registered to receive state for service " + state_id);
+ }
+ else
+ {
+ task.setState(state);
+ }
}
public void setState(String state_id, InputStream istream)
{
- throw new UnsupportedOperationException("Not implemented; see http://jira.jboss.com/jira/browse/JBAS-3594");
+ StateTransferTask task = ClusterPartition.this.stateTransferTasks.get(state_id);
+ if (task == null)
+ {
+ ClusterPartition.this.log.warn("No " + StateTransferTask.class.getSimpleName() +
+ " registered to receive state for service " + state_id);
+ }
+ else
+ {
+ task.setState(istream);
+ }
}
public void receive(org.jgroups.Message msg)
@@ -2619,203 +2734,203 @@
}
}
-// private class StateTransferTask implements Callable<Serializable>
-// {
-// private final String serviceName;
-// private final WeakReference<ClassLoader> classloader;
-// private Serializable result;
-// private boolean isStateSet;
-// private Exception setStateException;
-//
-// StateTransferTask(String serviceName, ClassLoader cl)
-// {
-// this.serviceName = serviceName;
-// if (cl != null)
-// {
-// classloader = null;
-// }
-// else
-// {
-// classloader = new WeakReference<ClassLoader>(cl);
-// }
-// }
-//
-// public Serializable call() throws Exception
-// {
-// boolean intr = false;
-// try
-// {
-// long start, stop;
-// this.isStateSet = false;
-// start = System.currentTimeMillis();
-// boolean rc = ClusterPartition.this.channel.getState(null, serviceName, ClusterPartition.this.getStateTransferTimeout());
-// if (rc)
-// {
-// synchronized (this)
-// {
-// while (!this.isStateSet)
-// {
-// if (this.setStateException != null)
-// {
-// throw this.setStateException;
-// }
-//
-// try
-// {
-// wait();
-// }
-// catch (InterruptedException iex)
-// {
-// intr = true;
-// }
-// }
-// }
-// stop = System.currentTimeMillis();
-// ClusterPartition.this.log.debug("serviceState was retrieved successfully (in " + (stop - start) + " milliseconds)");
-// }
-// else
-// {
-// // No one provided us with serviceState.
-// // We need to find out if we are the coordinator, so we must
-// // block until viewAccepted() is called at least once
-//
-// synchronized (ClusterPartition.this.members)
-// {
-// while (ClusterPartition.this.members.size() == 0)
-// {
-// ClusterPartition.this.log.debug("waiting on viewAccepted()");
-// try
-// {
-// ClusterPartition.this.members.wait();
-// }
-// catch (InterruptedException iex)
-// {
-// intr = true;
-// }
-// }
-// }
-//
-// if (ClusterPartition.this.isCurrentNodeCoordinator())
-// {
-// ClusterPartition.this.log.debug("State could not be retrieved for service " + serviceName + " (we are the first member in group)");
-// }
-// else
-// {
-// throw new IllegalStateException("Initial serviceState transfer failed: " +
-// "Channel.getState() returned false");
-// }
-// }
-// }
-// finally
-// {
-// if (intr) Thread.currentThread().interrupt();
-// }
-//
-// return result;
-// }
-//
-// void setState(byte[] state)
-// {
-// try
-// {
-// if (state == null)
-// {
-// ClusterPartition.this.log.debug("transferred state for service " +
-// serviceName + " is null (may be first member in cluster)");
-// }
-// else
-// {
-// ByteArrayInputStream bais = new ByteArrayInputStream(state);
-// setStateInternal(bais);
-// bais.close();
-// }
-//
-// this.isStateSet = true;
-// }
-// catch (Throwable t)
-// {
-// recordSetStateFailure(t);
-// }
-// finally
-// {
-// // Notify waiting thread that serviceState has been set.
-// synchronized(this)
-// {
-// notifyAll();
-// }
-// }
-// }
-//
-// void setState(InputStream state)
-// {
-// try
-// {
-// if (state == null)
-// {
-// ClusterPartition.this.log.debug("transferred state for service " +
-// serviceName + " is null (may be first member in cluster)");
-// }
-// else
-// {
-// setStateInternal(state);
-// }
-//
-// this.isStateSet = true;
-// }
-// catch (Throwable t)
-// {
-// recordSetStateFailure(t);
-// }
-// finally
-// {
-// // Notify waiting thread that serviceState has been set.
-// synchronized(this)
-// {
-// notifyAll();
-// }
-// }
-//
-// }
-//
-// private void setStateInternal(InputStream is) throws IOException, ClassNotFoundException
-// {
-// ClassLoader cl = getStateTransferClassLoader();
-// SwitchContext switchContext = ClusterPartition.this.classLoaderSwitcher.getSwitchContext(cl);
-// try
-// {
-// MarshalledValueInputStream mvis = new MarshalledValueInputStream(is);
-// this.result = (Serializable) mvis.readObject();
-// }
-// finally
-// {
-// switchContext.reset();
-// }
-// }
-//
-// private void recordSetStateFailure(Throwable t)
-// {
-// ClusterPartition.this.log.error("failed setting serviceState for service " + serviceName, t);
-// if (t instanceof Exception)
-// {
-// this.setStateException = (Exception) t;
-// }
-// else
-// {
-// this.setStateException = new Exception(t);
-// }
-// }
-//
-// private ClassLoader getStateTransferClassLoader()
-// {
-// ClassLoader cl = classloader == null ? null : classloader.get();
-// if (cl == null)
-// {
-// cl = this.getClass().getClassLoader();
-// }
-// return cl;
-// }
-//
-// }
+ private class StateTransferTask implements Callable<Serializable>
+ {
+ private final String serviceName;
+ private final WeakReference<ClassLoader> classloader;
+ private Serializable result;
+ private boolean isStateSet;
+ private Exception setStateException;
+
+ StateTransferTask(String serviceName, ClassLoader cl)
+ {
+ this.serviceName = serviceName;
+ if (cl != null)
+ {
+ classloader = null;
+ }
+ else
+ {
+ classloader = new WeakReference<ClassLoader>(cl);
+ }
+ }
+
+ public Serializable call() throws Exception
+ {
+ boolean intr = false;
+ try
+ {
+ long start, stop;
+ this.isStateSet = false;
+ start = System.currentTimeMillis();
+ boolean rc = ClusterPartition.this.channel.getState(null, serviceName, ClusterPartition.this.getStateTransferTimeout());
+ if (rc)
+ {
+ synchronized (this)
+ {
+ while (!this.isStateSet)
+ {
+ if (this.setStateException != null)
+ {
+ throw this.setStateException;
+ }
+
+ try
+ {
+ wait();
+ }
+ catch (InterruptedException iex)
+ {
+ intr = true;
+ }
+ }
+ }
+ stop = System.currentTimeMillis();
+ ClusterPartition.this.log.debug("serviceState was retrieved successfully (in " + (stop - start) + " milliseconds)");
+ }
+ else
+ {
+ // No one provided us with serviceState.
+ // We need to find out if we are the coordinator, so we must
+ // block until viewAccepted() is called at least once
+
+ synchronized (ClusterPartition.this.members)
+ {
+ while (ClusterPartition.this.members.size() == 0)
+ {
+ ClusterPartition.this.log.debug("waiting on viewAccepted()");
+ try
+ {
+ ClusterPartition.this.members.wait();
+ }
+ catch (InterruptedException iex)
+ {
+ intr = true;
+ }
+ }
+ }
+
+ if (ClusterPartition.this.isCurrentNodeCoordinator())
+ {
+ ClusterPartition.this.log.debug("State could not be retrieved for service " + serviceName + " (we are the first member in group)");
+ }
+ else
+ {
+ throw new IllegalStateException("Initial serviceState transfer failed: " +
+ "Channel.getState() returned false");
+ }
+ }
+ }
+ finally
+ {
+ if (intr) Thread.currentThread().interrupt();
+ }
+
+ return result;
+ }
+
+ void setState(byte[] state)
+ {
+ try
+ {
+ if (state == null)
+ {
+ ClusterPartition.this.log.debug("transferred state for service " +
+ serviceName + " is null (may be first member in cluster)");
+ }
+ else
+ {
+ ByteArrayInputStream bais = new ByteArrayInputStream(state);
+ setStateInternal(bais);
+ bais.close();
+ }
+
+ this.isStateSet = true;
+ }
+ catch (Throwable t)
+ {
+ recordSetStateFailure(t);
+ }
+ finally
+ {
+ // Notify waiting thread that serviceState has been set.
+ synchronized(this)
+ {
+ notifyAll();
+ }
+ }
+ }
+
+ void setState(InputStream state)
+ {
+ try
+ {
+ if (state == null)
+ {
+ ClusterPartition.this.log.debug("transferred state for service " +
+ serviceName + " is null (may be first member in cluster)");
+ }
+ else
+ {
+ setStateInternal(state);
+ }
+
+ this.isStateSet = true;
+ }
+ catch (Throwable t)
+ {
+ recordSetStateFailure(t);
+ }
+ finally
+ {
+ // Notify waiting thread that serviceState has been set.
+ synchronized(this)
+ {
+ notifyAll();
+ }
+ }
+
+ }
+
+ private void setStateInternal(InputStream is) throws IOException, ClassNotFoundException
+ {
+ ClassLoader cl = getStateTransferClassLoader();
+ SwitchContext switchContext = ClusterPartition.this.classLoaderSwitcher.getSwitchContext(cl);
+ try
+ {
+ MarshalledValueInputStream mvis = new MarshalledValueInputStream(is);
+ this.result = (Serializable) mvis.readObject();
+ }
+ finally
+ {
+ switchContext.reset();
+ }
+ }
+
+ private void recordSetStateFailure(Throwable t)
+ {
+ ClusterPartition.this.log.error("failed setting serviceState for service " + serviceName, t);
+ if (t instanceof Exception)
+ {
+ this.setStateException = (Exception) t;
+ }
+ else
+ {
+ this.setStateException = new Exception(t);
+ }
+ }
+
+ private ClassLoader getStateTransferClassLoader()
+ {
+ ClassLoader cl = classloader == null ? null : classloader.get();
+ if (cl == null)
+ {
+ cl = this.getClass().getClassLoader();
+ }
+ return cl;
+ }
+
+ }
@SuppressWarnings("unchecked")
private static Vector<Address> cloneMembers(View view)
More information about the jboss-cvs-commits
mailing list