[jboss-cvs] JBossAS SVN: r107153 - in projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core: jgroups/blocks/mux and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Jul 28 10:41:08 EDT 2010
Author: pferraro
Date: 2010-07-28 10:41:07 -0400 (Wed, 28 Jul 2010)
New Revision: 107153
Modified:
projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/framework/server/AsynchEventHandler.java
projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/framework/server/ClusterNodeImpl.java
projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/framework/server/CoreGroupCommunicationService.java
projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/framework/server/DistributedReplicantManagerImpl.java
projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/framework/server/HAPartitionImpl.java
projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/framework/server/RspFilterAdapter.java
projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/jgroups/blocks/mux/DelegatingStateTransferUpHandler.java
projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/jgroups/blocks/mux/MuxUpHandler.java
Log:
Fix compiler warnings
Modified: projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/framework/server/AsynchEventHandler.java
===================================================================
--- projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/framework/server/AsynchEventHandler.java 2010-07-28 13:39:29 UTC (rev 107152)
+++ projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/framework/server/AsynchEventHandler.java 2010-07-28 14:41:07 UTC (rev 107153)
@@ -91,6 +91,7 @@
events.add(event);
}
+ @Override
public void run()
{
log.debug("Begin " + name + " Thread");
Modified: projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/framework/server/ClusterNodeImpl.java
===================================================================
--- projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/framework/server/ClusterNodeImpl.java 2010-07-28 13:39:29 UTC (rev 107152)
+++ projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/framework/server/ClusterNodeImpl.java 2010-07-28 14:41:07 UTC (rev 107153)
@@ -77,19 +77,22 @@
// Public --------------------------------------------------------
+ @Override
public String getName()
{
return this.id;
}
+ @Override
public InetAddress getIpAddress()
{
return this.address.getInetAddress();
}
+ @Override
public int getPort()
{
- return this.address.getPort();
+ return this.address.getPort().intValue();
}
// Package protected ----------------------------------------------
@@ -101,6 +104,7 @@
// Comparable implementation ----------------------------------------------
+ @Override
public int compareTo(ClusterNode o)
{
if (o == null)
@@ -114,6 +118,7 @@
// java.lang.Object overrides ---------------------------------------------------
+ @Override
public boolean equals(Object obj)
{
if (this == obj) return true;
@@ -124,11 +129,13 @@
return this.id.equals(other.id);
}
+ @Override
public int hashCode()
{
return id.hashCode();
}
+ @Override
public String toString()
{
return this.getName();
Modified: projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/framework/server/CoreGroupCommunicationService.java
===================================================================
--- projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/framework/server/CoreGroupCommunicationService.java 2010-07-28 13:39:29 UTC (rev 107152)
+++ projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/framework/server/CoreGroupCommunicationService.java 2010-07-28 14:41:07 UTC (rev 107153)
@@ -80,8 +80,8 @@
import org.jgroups.UpHandler;
import org.jgroups.Version;
import org.jgroups.View;
-import org.jgroups.blocks.GroupRequest;
import org.jgroups.blocks.MethodCall;
+import org.jgroups.blocks.Request;
import org.jgroups.blocks.RequestOptions;
import org.jgroups.blocks.RpcDispatcher;
import org.jgroups.blocks.mux.MuxRpcDispatcher;
@@ -158,52 +158,52 @@
private boolean channelSelfConnected;
/** The JGroups channel */
- private Channel channel;
+ Channel channel;
/** the local JG IP Address */
private Address localJGAddress = null;
/** me as a ClusterNode */
- private ClusterNode me = null;
+ ClusterNode me = null;
/** The current view of the group */
private volatile GroupView groupView = new GroupView();
private long method_call_timeout=60000;
- private Short scopeId;
+ Short scopeId;
private RpcDispatcher dispatcher = null;
- private final Map<String, Object> rpcHandlers = new ConcurrentHashMap<String, Object>();
+ final Map<String, Object> rpcHandlers = new ConcurrentHashMap<String, Object>();
private boolean directlyInvokeLocal;
- private final Map<String, WeakReference<ClassLoader>> clmap = new ConcurrentHashMap<String, WeakReference<ClassLoader>>();
+ final Map<String, WeakReference<ClassLoader>> clmap = new ConcurrentHashMap<String, WeakReference<ClassLoader>>();
/** Do we send any membership change notifications synchronously? */
private boolean allowSyncListeners = false;
/** The asynchronously invoked GroupMembershipListeners */
- private final ArrayList<GroupMembershipListener> asyncMembershipListeners = new ArrayList<GroupMembershipListener>();
+ final ArrayList<GroupMembershipListener> asyncMembershipListeners = new ArrayList<GroupMembershipListener>();
/** The HAMembershipListener and HAMembershipExtendedListeners */
private final ArrayList<GroupMembershipListener> syncMembershipListeners = new ArrayList<GroupMembershipListener>();
/** The handler used to send membership change notifications asynchronously */
private AsynchEventHandler asynchHandler;
private long state_transfer_timeout=60000;
- private String stateIdPrefix;
- private final Map<String, StateTransferProvider> stateProviders = new HashMap<String, StateTransferProvider>();
- private final Map<String, StateTransferTask<?, ?>> stateTransferTasks = new Hashtable<String, StateTransferTask<?, ?>>();
+ String stateIdPrefix;
+ final Map<String, StateTransferProvider> stateProviders = new HashMap<String, StateTransferProvider>();
+ final Map<String, StateTransferTask<?, ?>> stateTransferTasks = new Hashtable<String, StateTransferTask<?, ?>>();
@SuppressWarnings("unchecked")
- private final ContextClassLoaderSwitcher classLoaderSwitcher = (ContextClassLoaderSwitcher) AccessController.doPrivileged(ContextClassLoaderSwitcher.INSTANTIATOR);
+ final ContextClassLoaderSwitcher classLoaderSwitcher = (ContextClassLoaderSwitcher) AccessController.doPrivileged(ContextClassLoaderSwitcher.INSTANTIATOR);
/** The cluster instance log category */
protected Logger log = Logger.getLogger(getClass().getName());;
- private Logger clusterLifeCycleLog = Logger.getLogger(getClass().getName() + ".lifecycle");
+ Logger clusterLifeCycleLog = Logger.getLogger(getClass().getName() + ".lifecycle");
private final Vector<String> history = new Vector<String>();
private int maxHistoryLength = 100;
/** Thread pool used to run state transfer requests */
- private Executor threadPool;
+ Executor threadPool;
- private final ThreadGate flushBlockGate = new ThreadGate();
+ final ThreadGate flushBlockGate = new ThreadGate();
private final ClusterNodeFactory nodeFactory = new ClusterNodeFactoryImpl();
- private final Object channelLock = new Object();
+ final Object channelLock = new Object();
private int state = UNREGISTERED;
@@ -213,16 +213,19 @@
// GroupCommunicationService implementation ----------------------
+ @Override
public boolean isConsistentWith(GroupCommunicationService other)
{
return this == other;
}
+ @Override
public String getNodeName()
{
return this.me == null ? null : this.me.getName();
}
+ @Override
public String getGroupName()
{
return this.groupName;
@@ -239,11 +242,13 @@
return result;
}
+ @Override
public long getCurrentViewId()
{
return this.groupView.viewId;
}
+ @Override
public ClusterNode[] getClusterNodes ()
{
GroupView curView = this.groupView;
@@ -253,6 +258,7 @@
}
}
+ @Override
public ClusterNode getClusterNode ()
{
return this.me;
@@ -277,6 +283,7 @@
/**
* {@inheritDoc}
*/
+ @Override
public void registerRPCHandler(String objName, Object subscriber)
{
this.rpcHandlers.put(objName, subscriber);
@@ -285,6 +292,7 @@
/**
* {@inheritDoc}
*/
+ @Override
public void registerRPCHandler(String objName, Object subscriber, ClassLoader classloader)
{
this.registerRPCHandler(objName, subscriber);
@@ -294,6 +302,7 @@
/**
* {@inheritDoc}
*/
+ @Override
public void unregisterRPCHandler(String objName, Object subscriber)
{
this.rpcHandlers.remove(objName);
@@ -303,6 +312,7 @@
/**
* {@inheritDoc}
*/
+ @Override
public ArrayList<?> callMethodOnCluster(String serviceName, String methodName,
Object[] args, Class<?>[] types, boolean excludeSelf) throws InterruptedException
{
@@ -312,6 +322,7 @@
/**
* {@inheritDoc}
*/
+ @Override
public ArrayList<?> callMethodOnCluster(String serviceName, String methodName,
Object[] args, Class<?>[] types, boolean excludeSelf, ResponseFilter filter) throws InterruptedException
{
@@ -321,13 +332,14 @@
/**
* {@inheritDoc}
*/
+ @Override
public <T> ArrayList<T> callMethodOnCluster(String serviceName, String methodName, Object[] args, Class<?>[] types,
Class<T> returnType, boolean excludeSelf, ResponseFilter filter, long methodTimeout, boolean unordered)
throws InterruptedException
{
MethodCall m = new MethodCall(serviceName + "." + methodName, args, types);
RspFilterAdapter rspFilter = filter == null ? null : new RspFilterAdapter(filter, this.nodeFactory);
- RequestOptions ro = new RequestOptions( GroupRequest.GET_ALL, methodTimeout, false, rspFilter);
+ RequestOptions ro = new RequestOptions(Request.GET_ALL, methodTimeout, false, rspFilter);
if (excludeSelf)
{
ro.setExclusionList(this.localJGAddress);
@@ -339,7 +351,7 @@
}
boolean trace = this.log.isTraceEnabled();
- if(trace)
+ if (trace)
{
this.log.trace("calling synchronous method on cluster, serviceName="+serviceName
+", methodName="+methodName+", members="+this.groupView+", excludeSelf="+excludeSelf);
@@ -369,7 +381,7 @@
return result;
}
- private <T> T invokeDirectly(String serviceName, String methodName, Object[] args, Class<?>[] types, Class<T> returnType, List<T> remoteResponses, ResponseFilter filter) throws Exception
+ <T> T invokeDirectly(String serviceName, String methodName, Object[] args, Class<?>[] types, Class<T> returnType, List<T> remoteResponses, ResponseFilter filter) throws Exception
{
T retVal = null;
Object handler = this.rpcHandlers.get(serviceName);
@@ -408,6 +420,7 @@
/**
* {@inheritDoc}
*/
+ @Override
public Object callMethodOnCoordinatorNode(String serviceName, String methodName,
Object[] args, Class<?>[] types,boolean excludeSelf) throws Exception
{
@@ -417,6 +430,7 @@
/**
* {@inheritDoc}
*/
+ @Override
public <T> T callMethodOnCoordinatorNode(String serviceName, String methodName,
Object[] args, Class<?>[] types, Class<T> returnType, boolean excludeSelf, long methodTimeout, boolean unordered) throws Exception
{
@@ -454,7 +468,7 @@
}
Address coord = this.groupView.coordinator;
- RequestOptions opt = new RequestOptions( GroupRequest.GET_ALL, methodTimeout);
+ RequestOptions opt = new RequestOptions(Request.GET_ALL, methodTimeout);
if (unordered)
{
opt.setFlags(Message.OOB);
@@ -480,6 +494,7 @@
/**
* {@inheritDoc}
*/
+ @Override
public Object callMethodOnNode(String serviceName, String methodName, Object[] args, Class<?>[] types,
ClusterNode targetNode) throws Exception
{
@@ -489,6 +504,7 @@
/**
* {@inheritDoc}
*/
+ @Override
public Object callMethodOnNode(String serviceName, String methodName,
Object[] args, Class<?>[] types, long methodTimeout, ClusterNode targetNode) throws Exception
{
@@ -498,6 +514,7 @@
/**
* {@inheritDoc}
*/
+ @Override
public <T> T callMethodOnNode(String serviceName, String methodName, Object[] args, Class<?>[] types,
Class<T> returnType, long methodTimeout, ClusterNode targetNode, boolean unordered)
throws Exception
@@ -530,7 +547,7 @@
}
Object rsp = null;
- RequestOptions opt = new RequestOptions(GroupRequest.GET_FIRST, methodTimeout);
+ RequestOptions opt = new RequestOptions(Request.GET_FIRST, methodTimeout);
if (unordered)
{
opt.setFlags(Message.OOB);
@@ -564,6 +581,7 @@
/**
* {@inheritDoc}
*/
+ @Override
public void callAsyncMethodOnNode(String serviceName, String methodName, Object[] args, Class<?>[] types,
ClusterNode targetNode) throws Exception
{
@@ -573,6 +591,7 @@
/**
* {@inheritDoc}
*/
+ @Override
public void callAsyncMethodOnNode(String serviceName, String methodName, Object[] args, Class<?>[] types,
ClusterNode targetNode, boolean unordered) throws Exception
{
@@ -597,7 +616,7 @@
return;
}
- RequestOptions opt = new RequestOptions(GroupRequest.GET_NONE, this.getMethodCallTimeout());
+ RequestOptions opt = new RequestOptions(Request.GET_NONE, this.getMethodCallTimeout());
if (unordered)
{
opt.setFlags(Message.OOB);
@@ -623,6 +642,7 @@
/**
* {@inheritDoc}
*/
+ @Override
public void callAsynchMethodOnCluster(String serviceName, String methodName,
Object[] args, Class<?>[] types, boolean excludeSelf) throws InterruptedException
{
@@ -632,11 +652,12 @@
/**
* {@inheritDoc}
*/
+ @Override
public void callAsynchMethodOnCluster(final String serviceName, final String methodName, final Object[] args, final Class<?>[] types,
boolean excludeSelf, boolean unordered) throws InterruptedException
{
MethodCall m = new MethodCall(serviceName + "." + methodName, args, types);
- RequestOptions ro = new RequestOptions( GroupRequest.GET_NONE, this.getMethodCallTimeout());
+ RequestOptions ro = new RequestOptions(Request.GET_NONE, this.getMethodCallTimeout());
if (excludeSelf)
{
ro.setExclusionList(this.localJGAddress);
@@ -665,12 +686,14 @@
}
+ @Override
public void callAsyncMethodOnCoordinatorNode(String serviceName, String methodName, Object[] args, Class<?>[] types,
boolean excludeSelf) throws Exception
{
this.callAsyncMethodOnCoordinatorNode(serviceName, methodName, args, types, excludeSelf, false);
}
+ @Override
public void callAsyncMethodOnCoordinatorNode(String serviceName, String methodName, Object[] args,
Class<?>[] types, boolean excludeSelf, boolean unordered) throws Exception
{
@@ -705,7 +728,7 @@
}
Address coord = this.groupView.coordinator;
- RequestOptions opt = new RequestOptions( GroupRequest.GET_ALL, this.getMethodCallTimeout());
+ RequestOptions opt = new RequestOptions(Request.GET_ALL, this.getMethodCallTimeout());
if (unordered)
{
opt.setFlags(Message.OOB);
@@ -757,11 +780,13 @@
this.allowSyncListeners = allowSync;
}
+ @Override
public void registerGroupMembershipListener(GroupMembershipListener listener)
{
registerGroupMembershipListener(listener, false);
}
+ @Override
public void unregisterGroupMembershipListener(GroupMembershipListener listener)
{
unregisterGroupMembershipListener(listener, false);
@@ -782,6 +807,7 @@
this.state_transfer_timeout = timeout;
}
+ @Override
public Future<SerializableStateTransferResult> getServiceState(String serviceName, ClassLoader classloader)
{
RunnableFuture<SerializableStateTransferResult> future = null;
@@ -811,11 +837,13 @@
return future;
}
+ @Override
public Future<SerializableStateTransferResult> getServiceState(String serviceName)
{
return getServiceState(serviceName, null);
}
+ @Override
public Future<StreamStateTransferResult> getServiceStateAsStream(String serviceName)
{
RunnableFuture<StreamStateTransferResult> future = null;
@@ -845,11 +873,13 @@
return future;
}
+ @Override
public void registerStateTransferProvider(String serviceName, StateTransferProvider provider)
{
this.stateProviders.put(serviceName, provider);
}
+ @Override
public void unregisterStateTransferProvider(String serviceName)
{
this.stateProviders.remove(serviceName);
@@ -952,7 +982,9 @@
this.stackName = stackName;
}
- public long getMethodCallTimeout() {
+ @Override
+ public long getMethodCallTimeout()
+ {
return this.method_call_timeout;
}
@@ -1291,7 +1323,7 @@
/**
* Creates an object from a byte buffer
*/
- private Object objectFromByteBufferInternal (byte[] buffer) throws Exception
+ Object objectFromByteBufferInternal (byte[] buffer) throws Exception
{
if(buffer == null)
{
@@ -1307,7 +1339,7 @@
* Serializes an object into a byte buffer.
* The object has to implement interface Serializable or Externalizable
*/
- private byte[] objectToByteBufferInternal (Object obj) throws Exception
+ byte[] objectToByteBufferInternal (Object obj) throws Exception
{
ByteArrayOutputStream baos = new ByteArrayOutputStream();
MarshalledValueOutputStream mvos = new MarshalledValueOutputStream(baos);
@@ -1319,7 +1351,7 @@
/**
* Creates a response object from a byte buffer - optimized for response marshalling
*/
- private Object objectFromByteBufferResponseInternal (byte[] buffer) throws Exception
+ Object objectFromByteBufferResponseInternal (byte[] buffer) throws Exception
{
if(buffer == null)
{
@@ -1342,7 +1374,7 @@
* Serializes a response object into a byte buffer, optimized for response marshalling.
* The object has to implement interface Serializable or Externalizable
*/
- private byte[] objectToByteBufferResponseInternal (Object obj) throws Exception
+ byte[] objectToByteBufferResponseInternal (Object obj) throws Exception
{
if (obj == null)
{
@@ -1390,10 +1422,8 @@
{
continue;
}
- else
- {
- rtn.add(returnType.cast(item));
- }
+
+ rtn.add(returnType.cast(item));
}
else if( trace )
{
@@ -1405,7 +1435,7 @@
return rtn;
}
- private GroupView processViewChange(View newView) throws Exception
+ GroupView processViewChange(View newView) throws Exception
{
GroupView oldMembers = this.groupView;
GroupView newGroupView = new GroupView(newView, oldMembers, this.nodeFactory);
@@ -1539,7 +1569,7 @@
}
}
- private static Vector<ClusterNode> translateAddresses(Vector<Address> addresses, ClusterNodeFactory factory)
+ static Vector<ClusterNode> translateAddresses(Vector<Address> addresses, ClusterNodeFactory factory)
{
if (addresses == null)
{
@@ -1562,7 +1592,7 @@
* @param newMembers Vector of new members
* @return Vector of members that have died between the two views, can be empty.
*/
- private static Vector<ClusterNode> getDeadMembers(Vector<ClusterNode> oldMembers, Vector<ClusterNode> newMembers)
+ static Vector<ClusterNode> getDeadMembers(Vector<ClusterNode> oldMembers, Vector<ClusterNode> newMembers)
{
if(oldMembers == null)
{
@@ -1583,7 +1613,7 @@
* @param allMembers Vector of new members
* @return Vector of members that have joined the partition between the two views
*/
- private static Vector<ClusterNode> getNewMembers(Vector<ClusterNode> oldMembers, Vector<ClusterNode> allMembers)
+ static Vector<ClusterNode> getNewMembers(Vector<ClusterNode> oldMembers, Vector<ClusterNode> allMembers)
{
if(oldMembers == null)
{
@@ -1598,7 +1628,7 @@
return newMembers;
}
- private void notifyListeners(ArrayList<GroupMembershipListener> theListeners, long viewID,
+ void notifyListeners(ArrayList<GroupMembershipListener> theListeners, long viewID,
Vector<ClusterNode> allMembers, Vector<ClusterNode> deadMembers, Vector<ClusterNode> newMembers,
Vector<List<ClusterNode>> originatingGroups)
{
@@ -1635,7 +1665,7 @@
}
@SuppressWarnings("unchecked")
- private static Vector<Address> cloneMembers(View view)
+ static Vector<Address> cloneMembers(View view)
{
return (Vector<Address>) view.getMembers().clone();
}
@@ -1669,7 +1699,7 @@
protected final Vector<Address> jgmembers;
protected final Address coordinator;
- private GroupView()
+ GroupView()
{
this.viewId = -1;
this.deadMembers = new Vector<ClusterNode>();
@@ -1679,7 +1709,7 @@
this.originatingGroups = null;
}
- private GroupView(View newView, GroupView previousView, ClusterNodeFactory factory)
+ GroupView(View newView, GroupView previousView, ClusterNodeFactory factory)
{
this.viewId = newView.getVid().getId();
this.jgmembers = cloneMembers(newView);
@@ -1707,14 +1737,16 @@
/**
* Marshalls request payloads for transmission across the cluster.
*/
- private class RequestMarshallerImpl implements org.jgroups.blocks.RpcDispatcher.Marshaller
+ class RequestMarshallerImpl implements org.jgroups.blocks.RpcDispatcher.Marshaller
{
+ @Override
public Object objectFromByteBuffer(byte[] buf) throws Exception
{
return CoreGroupCommunicationService.this.objectFromByteBufferInternal(buf);
}
+ @Override
public byte[] objectToByteBuffer(Object obj) throws Exception
{
// wrap MethodCall in Object[service_name, byte[]] so that service name is available during demarshalling
@@ -1733,9 +1765,10 @@
/**
* Marshalls responses for transmission across the cluster.
*/
- private class ResponseMarshallerImpl implements org.jgroups.blocks.RpcDispatcher.Marshaller
+ class ResponseMarshallerImpl implements org.jgroups.blocks.RpcDispatcher.Marshaller
{
+ @Override
public Object objectFromByteBuffer(byte[] buf) throws Exception
{
boolean trace = CoreGroupCommunicationService.this.log.isTraceEnabled();
@@ -1779,6 +1812,7 @@
}
}
+ @Override
public byte[] objectToByteBuffer(Object obj) throws Exception
{
return CoreGroupCommunicationService.this.objectToByteBufferResponseInternal(obj);
@@ -1791,7 +1825,7 @@
*/
private class RpcHandler extends MuxRpcDispatcher implements StateTransferFilter
{
- private RpcHandler(short scopeId, Channel channel, MessageListener messageListener,
+ RpcHandler(short scopeId, Channel channel, MessageListener messageListener,
MembershipListener membershipListener,
Marshaller reqMarshaller, Marshaller rspMarshaller)
{
@@ -1815,6 +1849,7 @@
* @param req The org.jgroups. representation of the method invocation
* @return The serializable return value from the invocation
*/
+ @Override
public Object handle(Message req)
{
Object body = null;
@@ -1963,12 +1998,12 @@
// Replace the handler again! TODO get this in superclass
Muxer<UpHandler> muxer = this.getMuxer();
if (muxer != null) {
- muxer.add(scopeId, new DelegatingStateTransferUpHandler(this.getProtocolAdapter(), this));
+ muxer.add(scopeId.shortValue(), new DelegatingStateTransferUpHandler(this.getProtocolAdapter(), this));
}
else
{
muxer = new MuxUpHandler(this.channel.getUpHandler());
- muxer.add(scopeId, new DelegatingStateTransferUpHandler(this.getProtocolAdapter(), this));
+ muxer.add(scopeId.shortValue(), new DelegatingStateTransferUpHandler(this.getProtocolAdapter(), this));
this.channel.setUpHandler((UpHandler) muxer);
}
}
@@ -1977,11 +2012,12 @@
public void stop() {
Muxer<UpHandler> muxer = this.getMuxer();
if (muxer != null) {
- muxer.remove(scopeId);
+ muxer.remove(scopeId.shortValue());
}
super.stop();
}
+ @Override
public boolean accepts(String stateId)
{
return stateId != null && stateId.startsWith(CoreGroupCommunicationService.this.stateIdPrefix );
@@ -1999,8 +2035,9 @@
* Handles callbacks from the thread that asynchronously deals with
* view change events.
*/
- private class ViewChangeEventProcessor implements AsynchEventHandler.AsynchEventProcessor
+ class ViewChangeEventProcessor implements AsynchEventHandler.AsynchEventProcessor
{
+ @Override
public void processEvent(Object event)
{
GroupView vce = (GroupView) event;
@@ -2021,7 +2058,7 @@
* @author Brian Goetz and Tim Peierls
*/
- private static class ThreadGate
+ static class ThreadGate
{
private static final int OPEN = 1;
private static final int CLOSED = -1;
@@ -2071,10 +2108,11 @@
/**
* Converts JGroups address objects into ClusterNode
*/
- private class ClusterNodeFactoryImpl implements ClusterNodeFactory
+ class ClusterNodeFactoryImpl implements ClusterNodeFactory
{
private final ConcurrentMap<Address, IpAddress> addressMap = new ConcurrentHashMap<Address, IpAddress>();
+ @Override
public ClusterNode getClusterNode(Address a)
{
IpAddress result = addressMap.get(a);
@@ -2087,7 +2125,7 @@
}
addressMap.put(a, result);
}
- AddressPort addrPort = new AddressPort(result.getIpAddress(), result.getPort());
+ AddressPort addrPort = new AddressPort(result.getIpAddress(), Integer.valueOf(result.getPort()));
String id = channel.getName(a);
if (id == null)
{
@@ -2134,8 +2172,9 @@
/**
* Handles MembershipListener callbacks from JGroups Channel
*/
- private class MembershipListenerImpl implements ExtendedMembershipListener
+ class MembershipListenerImpl implements ExtendedMembershipListener
{
+ @Override
public void suspect(org.jgroups.Address suspected_mbr)
{
CoreGroupCommunicationService.this.logHistory ("Node suspected: " + (suspected_mbr==null?"null":suspected_mbr.toString()));
@@ -2149,12 +2188,14 @@
}
}
+ @Override
public void block()
{
CoreGroupCommunicationService.this.flushBlockGate.close();
CoreGroupCommunicationService.this.log.debug("Block processed at " + CoreGroupCommunicationService.this.me);
}
+ @Override
public void unblock()
{
CoreGroupCommunicationService.this.flushBlockGate.open();
@@ -2171,6 +2212,7 @@
*
* @param newView
*/
+ @Override
public void viewAccepted(View newView)
{
try
@@ -2192,15 +2234,15 @@
/**
* Handles MessageListener callbacks from the JGroups layer.
*/
- private class MessageListenerImpl
- implements ExtendedMessageListener
+ class MessageListenerImpl implements ExtendedMessageListener
{
-
+ @Override
public void receive(org.jgroups.Message msg)
{
// no-op
}
+ @Override
public void getState(String state_id, OutputStream ostream)
{
String serviceName = extractServiceName(state_id);
@@ -2247,6 +2289,7 @@
}
}
+ @Override
public byte[] getState(String state_id)
{
String serviceName = extractServiceName(state_id);
@@ -2290,6 +2333,7 @@
return null; // This will cause the receiver to get a "false" on the channel.getState() call
}
+ @Override
public void setState(String state_id, byte[] state)
{
String serviceName = extractServiceName(state_id);
@@ -2308,6 +2352,7 @@
}
}
+ @Override
public void setState(String state_id, InputStream istream)
{
String serviceName = extractServiceName(state_id);
@@ -2336,21 +2381,25 @@
}
}
+ @Override
public byte[] getState()
{
throw new UnsupportedOperationException("Only partial state transfer (with a state_id) is supported");
}
+ @Override
public void getState(OutputStream stream)
{
throw new UnsupportedOperationException("Only partial state transfer (with a state_id) is supported");
}
+ @Override
public void setState(byte[] obj)
{
throw new UnsupportedOperationException("Only partial state transfer (with a state_id) is supported");
}
+ @Override
public void setState(InputStream stream)
{
throw new UnsupportedOperationException("Only partial state transfer (with a state_id) is supported");
@@ -2377,7 +2426,7 @@
V state;
private boolean isStateSet;
private Exception setStateException;
- private T result;
+ T result;
private final Object callMutex = new Object();
StateTransferTask(String serviceName)
@@ -2385,6 +2434,7 @@
this.serviceName = serviceName;
}
+ @Override
public T call() throws Exception
{
synchronized (callMutex)
@@ -2575,26 +2625,29 @@
protected SerializableStateTransferResult createStateTransferResult(final boolean gotState, final Serializable state,
final Exception exception)
{
- return new SerializableStateTransferResult() {
-
+ return new SerializableStateTransferResult()
+ {
+ @Override
public Serializable getState()
{
return state;
}
+ @Override
public Exception getStateTransferException()
{
return exception;
}
+ @Override
public boolean stateReceived()
{
return gotState;
}
-
};
}
+ @Override
protected void setStateInternal(InputStream is) throws IOException, ClassNotFoundException
{
ClassLoader cl = getStateTransferClassLoader();
@@ -2632,26 +2685,29 @@
protected StreamStateTransferResult createStateTransferResult(final boolean gotState, final InputStream state,
final Exception exception)
{
- return new StreamStateTransferResult() {
-
+ return new StreamStateTransferResult()
+ {
+ @Override
public InputStream getState()
{
return state;
}
+ @Override
public Exception getStateTransferException()
{
return exception;
}
+ @Override
public boolean stateReceived()
{
return gotState;
}
-
};
}
+ @Override
protected void setStateInternal(InputStream is) throws IOException, ClassNotFoundException
{
this.state = is;
@@ -2669,7 +2725,7 @@
private final Object[] args;
private final Class<?>[] types;
- private AsynchronousLocalInvocation(String serviceName, String methodName, Object[] args, Class<?>[] types)
+ AsynchronousLocalInvocation(String serviceName, String methodName, Object[] args, Class<?>[] types)
{
this.serviceName = serviceName;
this.methodName = methodName;
@@ -2677,6 +2733,7 @@
this.types = types;
}
+ @Override
public void run()
{
try
@@ -2701,7 +2758,5 @@
run();
}
}
-
}
-
}
Modified: projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/framework/server/DistributedReplicantManagerImpl.java
===================================================================
--- projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/framework/server/DistributedReplicantManagerImpl.java 2010-07-28 13:39:29 UTC (rev 107152)
+++ projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/framework/server/DistributedReplicantManagerImpl.java 2010-07-28 14:41:07 UTC (rev 107153)
@@ -56,6 +56,7 @@
* @author <a href="mailto:pferraro at redhat.com">Paul Ferraro</a>
* @version $Revision$
*/
+ at SuppressWarnings("deprecation")
public class DistributedReplicantManagerImpl
implements DistributedReplicantManagerImplMBean,
HAPartition.HAMembershipExtendedListener,
@@ -72,7 +73,7 @@
private static final Class<?>[] remove_types = new Class<?>[] { String.class, String.class };
// Attributes ----------------------------------------------------
- private static final AtomicInteger threadID = new AtomicInteger();
+ static final AtomicInteger threadID = new AtomicInteger();
private final ConcurrentMap<String, Serializable> localReplicants = new ConcurrentHashMap<String, Serializable>();
private final ConcurrentMap<String, ConcurrentMap<String, Serializable>> replicants = new ConcurrentHashMap<String, ConcurrentMap<String, Serializable>>();
@@ -83,7 +84,7 @@
/** The handler used to send replicant change notifications asynchronously */
private final AsynchEventHandler asynchHandler;
- private final Logger log;
+ final Logger log;
private String nodeName = null;
@@ -98,11 +99,6 @@
{
super();
- if (partition == null)
- {
- throw new NullPointerException("partition is null");
- }
-
this.partition = partition;
this.log = Logger.getLogger(this.getClass().getName() + "." + partition.getPartitionName());
@@ -189,6 +185,7 @@
// @ManagementProperty(use={ViewUse.STATISTIC}, description="The partition's name")
// @ManagementObjectID(type="DistributedReplicantManager")
+ @Override
public String getPartitionName()
{
return this.partition.getPartitionName();
@@ -202,6 +199,7 @@
// @ManagementOperation(name="listDRMContent",
// description="List all known keys and the nodes that have registered bindings",
// impact=Impact.ReadOnly)
+ @Override
public String listContent() throws Exception
{
StringBuilder result = new StringBuilder();
@@ -248,6 +246,7 @@
// @ManagementOperation(name="listDRMContentAsXml",
// description="List in XML format all known services and the nodes that have registered bindings",
// impact=Impact.ReadOnly)
+ @Override
public String listXmlContent() throws Exception
{
StringBuilder result = new StringBuilder();
@@ -292,6 +291,7 @@
// HAPartition.HAPartitionStateTransfer implementation ----------------------------------------------
+ @Override
public Serializable getCurrentState()
{
Map<String, ConcurrentMap<String, Serializable>> result = new HashMap<String, ConcurrentMap<String, Serializable>>();
@@ -323,14 +323,15 @@
}
@SuppressWarnings("unchecked")
+ @Override
public void setCurrentState(Serializable newState)
{
Object[] globalState = (Object[]) newState;
- Map<String, ConcurrentMap<String, Serializable>> map = (Map) globalState[0];
+ Map<String, ConcurrentMap<String, Serializable>> map = (Map<String, ConcurrentMap<String, Serializable>>) globalState[0];
this.replicants.putAll(map);
- this.intraviewIdCache = (Map) globalState[1];
+ this.intraviewIdCache = (Map<String, Integer>) globalState[1];
if (this.log.isTraceEnabled())
{
@@ -343,6 +344,7 @@
// @ManagementOperation(name="getAllDRMServices",
// description="Get a collection of the names of all keys for which we have bindings",
// impact=Impact.ReadOnly)
+ @Override
public Collection<String> getAllServices()
{
Set<String> services = new HashSet<String>();
@@ -353,7 +355,8 @@
// HAPartition.HAMembershipListener implementation ----------------------------------------------
- @SuppressWarnings("unchecked")
+ @Override
+ @SuppressWarnings({"unchecked", "rawtypes"})
public void membershipChangedDuringMerge(Vector deadMembers, Vector newMembers, Vector allMembers, Vector originatingGroups)
{
// Here we only care about deadMembers. Purge all replicant lists of deadMembers
@@ -369,7 +372,8 @@
}
}
- @SuppressWarnings("unchecked")
+ @Override
+ @SuppressWarnings({"unchecked", "rawtypes"})
public void membershipChanged(Vector deadMembers, Vector newMembers, Vector allMembers)
{
// Here we only care about deadMembers. Purge all replicant lists of deadMembers
@@ -386,6 +390,7 @@
// AsynchEventHandler.AsynchEventProcessor implementation -----------------
+ @Override
public void processEvent(Object event)
{
KeyChangeEvent kce = (KeyChangeEvent) event;
@@ -401,6 +406,7 @@
// DistributedReplicantManager implementation ----------------------------------------------
+ @Override
public void add(String key, Serializable replicant) throws Exception
{
if (this.log.isTraceEnabled())
@@ -426,6 +432,7 @@
this.notifyKeyListeners(key, replicants, false);
}
+ @Override
public void remove(String key) throws Exception
{
this.partitionNameKnown.await(); // we don't propagate until our name is known
@@ -460,11 +467,13 @@
}
}
+ @Override
public Serializable lookupLocalReplicant(String key)
{
return this.localReplicants.get(key);
}
+ @Override
public List<Serializable> lookupReplicants(String key)
{
Serializable local = this.localReplicants.get(key);
@@ -520,6 +529,7 @@
// impact=Impact.ReadOnly,
// params={@ManagementParameter(name="key",
// description="The name of the service")})
+ @Override
public List<String> lookupReplicantsNodeNames(String key)
{
List<ClusterNode> nodes = this.lookupReplicantsNodes(key);
@@ -536,6 +546,7 @@
return nodeNames;
}
+ @Override
public List<ClusterNode> lookupReplicantsNodes(String key)
{
boolean local = this.localReplicants.containsKey(key);
@@ -567,6 +578,7 @@
return rtn;
}
+ @Override
public void registerListener(String key, ReplicantListener subscriber)
{
List<ReplicantListener> list = new CopyOnWriteArrayList<ReplicantListener>();
@@ -576,6 +588,7 @@
((existing != null) ? existing : list).add(subscriber);
}
+ @Override
public void unregisterListener(String key, DistributedReplicantManager.ReplicantListener subscriber)
{
List<ReplicantListener> listeners = this.keyListeners.get(key);
@@ -594,6 +607,7 @@
// impact=Impact.ReadOnly,
// params={@ManagementParameter(name="key",
// description="The name of the service")})
+ @Override
public int getReplicantsViewId(String key)
{
Integer result = this.intraviewIdCache.get(key);
@@ -605,6 +619,7 @@
// description="Returns whether the DRM considers this node to be the master for the given service",
// impact=Impact.ReadOnly,
// params={@ManagementParameter(name="key", description="The name of the service")})
+ @Override
public boolean isMasterReplica(String key)
{
if (this.log.isTraceEnabled())
@@ -1074,6 +1089,7 @@
* Called when the service needs to merge with another partition. This
* process is performed asynchronously
*/
+ @Override
public void run()
{
DistributedReplicantManagerImpl.this.log.debug("Sleeping for 50ms before mergeMembers");
@@ -1103,6 +1119,7 @@
* Called when service needs to re-publish its local replicants to other
* cluster members after this node has joined the cluster.
*/
+ @Override
public void run()
{
DistributedReplicantManagerImpl.this.log.debug("DRM: Sleeping before re-publishing for 50ms just in case");
Modified: projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/framework/server/HAPartitionImpl.java
===================================================================
--- projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/framework/server/HAPartitionImpl.java 2010-07-28 13:39:29 UTC (rev 107152)
+++ projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/framework/server/HAPartitionImpl.java 2010-07-28 14:41:07 UTC (rev 107153)
@@ -31,6 +31,7 @@
import org.jboss.ha.framework.interfaces.ClusterNode;
import org.jboss.ha.framework.interfaces.DistributedReplicantManager;
+import org.jboss.ha.framework.interfaces.DistributedState;
import org.jboss.ha.framework.interfaces.GroupMembershipListener;
import org.jboss.ha.framework.interfaces.HAPartition;
import org.jboss.ha.framework.interfaces.SerializableStateTransferResult;
@@ -64,8 +65,7 @@
/** The cluster replicant manager */
private DistributedReplicantManagerImpl replicantManager;
/** The DistributedState service we manage */
- @SuppressWarnings("deprecation")
- private org.jboss.ha.framework.interfaces.DistributedState distributedState;
+ private DistributedState distributedState;
// Static --------------------------------------------------------
@@ -79,18 +79,20 @@
// HAPartition implementation ----------------------------------------------
+ @Override
public String getPartitionName()
{
return getGroupName();
}
+ @Override
public DistributedReplicantManager getDistributedReplicantManager()
{
return this.replicantManager;
}
- @SuppressWarnings("deprecation")
- public org.jboss.ha.framework.interfaces.DistributedState getDistributedStateService()
+ @Override
+ public DistributedState getDistributedStateService()
{
return this.distributedState;
}
@@ -101,6 +103,7 @@
// *************************
// *************************
+ @Override
@SuppressWarnings("deprecation")
public void registerMembershipListener(HAMembershipListener listener)
{
@@ -113,6 +116,7 @@
registerGroupMembershipListener(adapter, !isAsynch);
}
+ @Override
@SuppressWarnings("deprecation")
public void unregisterMembershipListener(HAMembershipListener listener)
{
@@ -131,6 +135,7 @@
// *************************
// *************************
+ @Override
@SuppressWarnings("deprecation")
public void subscribeToStateTransferEvents(String serviceName, HAPartitionStateTransfer subscriber)
{
@@ -139,6 +144,7 @@
this.initialStateRecipients.put(serviceName, subscriber);
}
+ @Override
@SuppressWarnings("deprecation")
public void unsubscribeFromStateTransferEvents(String serviceName, HAPartitionStateTransfer subscriber)
{
@@ -148,8 +154,7 @@
// Public ------------------------------------------------------------------
- @SuppressWarnings("deprecation")
- public void setDistributedStateImpl(org.jboss.ha.framework.interfaces.DistributedState distributedState)
+ public void setDistributedStateImpl(DistributedState distributedState)
{
this.distributedState = distributedState;
}
@@ -197,7 +202,7 @@
}
@Override
- protected void startService() throws Exception
+ protected void startService() throws Exception
{
this.logHistory ("Starting partition " + this.getPartitionName());
@@ -295,11 +300,9 @@
{
throw result.getStateTransferException();
}
- else
- {
- entry.getValue().setCurrentState(result.getState());
- log.debug("Received state for " + entry.getKey());
- }
+
+ entry.getValue().setCurrentState(result.getState());
+ log.debug("Received state for " + entry.getKey());
}
else if (result.getStateTransferException() != null)
{
@@ -328,12 +331,14 @@
this.target = target;
}
+ @Override
public void membershipChanged(List<ClusterNode> deadMembers, List<ClusterNode> newMembers,
List<ClusterNode> allMembers)
{
target.membershipChanged(castMembers(deadMembers), castMembers(newMembers), castMembers(allMembers));
}
+ @Override
public void membershipChangedDuringMerge(List<ClusterNode> deadMembers, List<ClusterNode> newMembers,
List<ClusterNode> allMembers, List<List<ClusterNode>> originatingGroups)
{
@@ -348,11 +353,13 @@
}
+ @Override
public boolean equals(Object obj)
{
return obj instanceof GroupMembershipListenerAdapter && ((GroupMembershipListenerAdapter) obj).target.equals(target);
}
+ @Override
public int hashCode()
{
return target.hashCode();
@@ -378,16 +385,15 @@
private final HAPartitionStateTransfer delegate;
@SuppressWarnings("deprecation")
- private StateTransferProviderAdapter(HAPartitionStateTransfer delegate)
+ StateTransferProviderAdapter(HAPartitionStateTransfer delegate)
{
this.delegate = delegate;
}
+ @Override
public Serializable getCurrentState()
{
return delegate.getCurrentState();
}
-
}
-
}
Modified: projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/framework/server/RspFilterAdapter.java
===================================================================
--- projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/framework/server/RspFilterAdapter.java 2010-07-28 13:39:29 UTC (rev 107152)
+++ projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/framework/server/RspFilterAdapter.java 2010-07-28 14:41:07 UTC (rev 107153)
@@ -43,11 +43,13 @@
this.factory = factory;
}
+ @Override
public boolean isAcceptable(Object response, Address sender)
{
return filter.isAcceptable(response, factory.getClusterNode(sender));
}
+ @Override
public boolean needMoreResponses()
{
return filter.needMoreResponses();
Modified: projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/jgroups/blocks/mux/DelegatingStateTransferUpHandler.java
===================================================================
--- projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/jgroups/blocks/mux/DelegatingStateTransferUpHandler.java 2010-07-28 13:39:29 UTC (rev 107152)
+++ projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/jgroups/blocks/mux/DelegatingStateTransferUpHandler.java 2010-07-28 14:41:07 UTC (rev 107153)
@@ -57,6 +57,7 @@
*
* {@inheritDoc}
*/
+ @Override
public Object up(Event evt)
{
return delegate.up(evt);
@@ -67,9 +68,9 @@
*
* {@inheritDoc}
*/
+ @Override
public boolean accepts(String stateId)
{
return filter.accepts(stateId);
}
-
}
Modified: projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/jgroups/blocks/mux/MuxUpHandler.java
===================================================================
--- projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/jgroups/blocks/mux/MuxUpHandler.java 2010-07-28 13:39:29 UTC (rev 107152)
+++ projects/cluster/ha-server-core/trunk/src/main/java/org/jboss/ha/core/jgroups/blocks/mux/MuxUpHandler.java 2010-07-28 14:41:07 UTC (rev 107153)
@@ -13,53 +13,53 @@
* Overrides superclass to allow state transfer multiplexing.
*
* @author Brian Stansberry
- *
- * @version $Id: MuxUpHandler.java,v 1.2 2010/04/15 20:05:22 ferraro Exp $
*/
-public class MuxUpHandler
- extends org.jgroups.blocks.mux.MuxUpHandler {
+public class MuxUpHandler extends org.jgroups.blocks.mux.MuxUpHandler
+{
+ private final Map<Short, UpHandler> stateTransferHandlers = new ConcurrentHashMap<Short, UpHandler>();
+ /**
+ * Creates a multiplexing up handler, with no default handler.
+ */
+ public MuxUpHandler()
+ {
+ super();
+ }
- private final Map<Short, UpHandler> stateTransferHandlers = new ConcurrentHashMap<Short, UpHandler>();
-
- /**
- * Creates a multiplexing up handler, with no default handler.
- */
- public MuxUpHandler() {
- super();
- }
+ /**
+ * Creates a multiplexing up handler using the specified default handler.
+ * @param defaultHandler a default up handler to handle messages with no {@link MuxHeader}
+ */
+ public MuxUpHandler(UpHandler defaultHandler)
+ {
+ super(defaultHandler);
+ }
- /**
- * Creates a multiplexing up handler using the specified default handler.
- * @param defaultHandler a default up handler to handle messages with no {@link MuxHeader}
- */
- public MuxUpHandler(UpHandler defaultHandler) {
- super(defaultHandler);
- }
+ /**
+ * {@inheritDoc}
+ * @see org.jgroups.blocks.mux.Muxer#add(short, java.lang.Object)
+ */
+ @Override
+ public void add(short id, UpHandler handler)
+ {
+ super.add(id, handler);
+ if (handler instanceof StateTransferFilter)
+ {
+ stateTransferHandlers.put(Short.valueOf(id), handler);
+ }
+ }
- /**
- * {@inheritDoc}
- * @see org.jgroups.blocks.mux.Muxer#add(short, java.lang.Object)
- */
- @Override
- public void add(short id, UpHandler handler) {
- super.add(id, handler);
- if (handler instanceof StateTransferFilter)
- {
- stateTransferHandlers.put(id, handler);
- }
- }
+ /**
+ * {@inheritDoc}
+ * @see org.jgroups.blocks.mux.Muxer#remove(short)
+ */
+ @Override
+ public void remove(short id)
+ {
+ super.remove(id);
+ stateTransferHandlers.remove(Short.valueOf(id));
+ }
- /**
- * {@inheritDoc}
- * @see org.jgroups.blocks.mux.Muxer#remove(short)
- */
- @Override
- public void remove(short id) {
- super.remove(id);
- stateTransferHandlers.remove(id);
- }
-
/**
* {@inheritDoc}
* @see org.jgroups.blocks.mux.MuxUpHandler#handleStateTransferEvent(org.jgroups.Event)
@@ -67,8 +67,8 @@
@Override
protected ImmutableReference<Object> handleStateTransferEvent(Event evt)
{
- StateTransferInfo info=(StateTransferInfo)evt.getArg();
- for (UpHandler uh: stateTransferHandlers.values())
+ StateTransferInfo info = (StateTransferInfo) evt.getArg();
+ for (UpHandler uh : stateTransferHandlers.values())
{
if (uh instanceof StateTransferFilter)
{
@@ -78,7 +78,7 @@
}
}
}
-
+
return null;
}
}
More information about the jboss-cvs-commits
mailing list