[jboss-jira] [JBoss JIRA] (JBAS-4526) JBossMQ UIL2 Read/Write Tasks should not be doing initialization

sujit kumar behera (JIRA) jira-events at lists.jboss.org
Sun Mar 4 01:30:37 EST 2012


    [ https://issues.jboss.org/browse/JBAS-4526?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12673402#comment-12673402 ] 

sujit kumar behera commented on JBAS-4526:
------------------------------------------

I have tweaked the SocketManager Code .Its finally destroying the read write threads after sometime(nearly 1 hour) when the server is idle.Thread.interrupt is not destrying the threads ,so the count is remaining open .
Have tested it thoroughly it is working fine.
See the code .

package org.jboss.mq.il.uil2;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.InetAddress;
import java.net.Socket;
import java.util.Iterator;

import javax.jms.JMSException;

import org.jboss.logging.Logger;
import org.jboss.mq.il.uil2.msgs.BaseMsg;
import org.jboss.util.stream.NotifyingBufferedInputStream;
import org.jboss.util.stream.NotifyingBufferedOutputStream;

import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;

/**
 * Used to manage the client/server and server/client communication in an
 * asynchrounous manner.
 * 
 * @todo verify the pooled executor config
 * 
 * @author Scott.Stark at jboss.org
 * @version $Revision$
 */
public class SocketManager {
	private static Logger log = Logger.getLogger(SocketManager.class);

	private static final int STOPPED = 0;
	private static final int STARTED = 1;
	private static final int STOPPING = 2;
	private static SynchronizedInt taskID = new SynchronizedInt(0);

	/** The socket created by the IL layer */
	private Socket socket;
	/** The input stream used by the read task */
	private ObjectInputStream in;
	/** The buffering for output */
	NotifyingBufferedInputStream bufferedInput;
	/** The output stream used by the write task */
	private ObjectOutputStream out;
	/** The buffering for output */
	NotifyingBufferedOutputStream bufferedOutput;
	/** The write task thread */
	private Thread writeThread;
	/** The read task thread */
	private Thread readThread;
	/** The thread pool used to service incoming requests */
	PooledExecutor pool;
	/** The flag used to control the read loop */
	private int readState = STOPPED;
	/** The flag used to control the write loop */
	private int writeState = STOPPED;
	/** Used for constrolling the state */
	private SynchronizedBoolean running = new SynchronizedBoolean(false);
	/** The queue of messages to be processed by the write task */
	private LinkedQueue sendQueue;
	/** A HashMap<Integer, BaseMsg> that are awaiting a reply */
	private ConcurrentHashMap replyMap;
	/** The callback handler used for msgs that are not replys */
	private SocketManagerHandler handler;
	/** The buffer size */
	private int bufferSize = 1;
	/** The chunk size for notification of stream activity */
	private int chunkSize = 0x40000000;
	/** The logging trace level which is set in the ctor */
	private boolean trace;

	public SocketManager(Socket s) throws IOException {
		socket = s;
		sendQueue = new LinkedQueue();
		replyMap = new ConcurrentHashMap();
		trace = log.isTraceEnabled();
	}

	/**
	 * Start the read and write threads using the given thread group and names
	 * of "UIL2.SocketManager.ReadTask" and "UIL2.SocketManager.WriteTask".
	 * 
	 * @param tg
	 *            the thread group to use for the read and write threads.
	 */
	public void start(ThreadGroup tg) {
		if (trace)
			log.trace("start called", new Exception("Start stack trace"));

		InetAddress inetAddr = socket.getInetAddress();
		String ipAddress = (inetAddr != null) ? inetAddr.getHostAddress()
				: "<unknown>";
		ipAddress += ":" + socket.getPort();
		if (pool == null) {
			// TODO: Check the validity of this config
			pool = new PooledExecutor(5);
			pool.setMinimumPoolSize(1);
			pool.setKeepAliveTime(1000 * 60);
			pool.runWhenBlocked();
			String id = "SocketManager.MsgPool@"
					+ Integer.toHexString(System.identityHashCode(this))
					+ " client=" + ipAddress;
			pool.setThreadFactory(new UILThreadFactory(id));
		}

		ReadTask readTask = new ReadTask();
		readThread = new Thread(tg, readTask, "UIL2.SocketManager.ReadTask#"
				+ taskID.increment() + " client=" + ipAddress);
		readThread.setDaemon(true);

		WriteTask writeTask = new WriteTask();
		writeThread = new Thread(tg, writeTask, "UIL2.SocketManager.WriteTask#"
				+ taskID.increment() + " client=" + ipAddress);
		writeThread.setDaemon(true);

		synchronized (running) {
			readState = STARTED;
			writeState = STARTED;
			running.set(true);
		}

		try {
			readThread.start();
			writeThread.start();
		} catch (Throwable t) {
			try {
				stop();
			} catch (Throwable ignored) {
			}

			try {
				socket.close();
			} catch (Throwable ignored) {
			}
			log.warn("Error starting socket manager threads", t);
		}
	}

	/**
	 * Stop the read and write threads by interrupting them.
	 */
	public void stop() {
		synchronized (running) {
			if (readState == STARTED) {
				readState = STOPPING;
				readThread.interrupt();
				readThread.stop();
			}
			if (writeState == STARTED) {
				writeState = STOPPING;
				writeThread.interrupt();
				writeThread.stop();
			}
			running.set(false);
			if (pool != null) {
				pool.shutdownNow();
				pool = null;
			}
		}
	}

	/**
	 * Set the callback handler for msgs that were not originated by the socket
	 * manager. This is any msgs read that was not sent via the sendMessage
	 * method.
	 * 
	 * @param handler
	 */
	public void setHandler(SocketManagerHandler handler) {
		this.handler = handler;
		if (bufferedInput != null)
			bufferedInput.setStreamListener(handler);
		if (bufferedOutput != null)
			bufferedOutput.setStreamListener(handler);
	}

	/**
	 * Sets the buffer size
	 * 
	 * @param size
	 *            the size of the buffer
	 */
	public void setBufferSize(int size) {
		this.bufferSize = size;
	}

	/**
	 * Sets the chunk size
	 * 
	 * @param size
	 *            the size of a chunk
	 */
	public void setChunkSize(int size) {
		this.chunkSize = size;
	}

	/**
	 * Send a two-way message and block the calling thread until the msg reply
	 * is received. This enques the msg to the sendQueue, places the msg in the
	 * replyMap and waits on the msg. The msg is notified by the read task
	 * thread when it finds a msg with a msgID that maps to the msg in the
	 * msgReply map.
	 * 
	 * @param msg
	 *            the request msg to send
	 * @throws Exception
	 *             thrown if the reply message has an error value
	 */
	public void sendMessage(BaseMsg msg) throws Exception {
		internalSendMessage(msg, true);
		if (msg.error != null) {
			if (trace)
				log.trace("sendMessage will throw error", msg.error);
			throw msg.error;
		}
	}

	/**
	 * Send a reply.
	 * 
	 * @param msg
	 *            the message
	 * @throws Exception
	 *             for any error
	 */
	public void sendReply(BaseMsg msg) throws Exception {
		msg.trimReply();
		internalSendMessage(msg, false);
	}

	/**
	 * Send a one-way.
	 * 
	 * @param msg
	 *            the message
	 * @throws Exception
	 *             for any error
	 */
	public void sendOneWay(BaseMsg msg) throws Exception {
		msg.getMsgID();
		internalSendMessage(msg, false);
	}

	/**
	 * This places the msg into the sendQueue and returns if waitOnReply is
	 * false, or enques the msg to the sendQueue, places the msg in the replyMap
	 * and waits on the msg.
	 * 
	 * @param msg
	 * @param waitOnReply
	 * @throws Exception
	 */
	private void internalSendMessage(BaseMsg msg, boolean waitOnReply)
			throws Exception {
		if (running.get() == false)
			throw new IOException("Client is not connected");

		if (waitOnReply) { // Send a request msg and wait for the reply
			synchronized (msg) {
				// Create the request msgID
				msg.getMsgID();
				if (trace)
					log.trace("Begin internalSendMessage, round-trip msg="
							+ msg);
				// Place the msg into the write queue and reply map
				replyMap.put(msg, msg);
				sendQueue.put(msg);
				// Wait for the msg reply
				msg.wait();
			}
		} else { // Send an asynchronous msg, typically a reply
			if (trace)
				log.trace("Begin internalSendMessage, one-way msg=" + msg);
			sendQueue.put(msg);
		}
		if (trace)
			log.trace("End internalSendMessage, msg=" + msg);
	}

	/**
	 * The task managing the socket read thread
	 * 
	 */
	public class ReadTask implements Runnable {
		public void run() {
			int msgType = 0;
			log.debug("Begin ReadTask.run");
			try {
				bufferedInput = new NotifyingBufferedInputStream(
						socket.getInputStream(), bufferSize, chunkSize, handler);
				in = new ObjectInputStream(bufferedInput);
				log.debug("Created ObjectInputStream");
			} catch (IOException e) {
				handleStop("Failed to create ObjectInputStream", e);
				return;
			}

			while (true) {
				try {
					msgType = in.readByte();
					int msgID = in.readInt();
					if (trace)
						log.trace("Read msgType: " + BaseMsg.toString(msgType)
								+ ", msgID: " + msgID);
					// See if there is a msg awaiting a reply
					BaseMsg key = new BaseMsg(msgType, msgID);
					BaseMsg msg = (BaseMsg) replyMap.remove(key);
					if (msg == null) {
						msg = BaseMsg.createMsg(msgType);
						msg.setMsgID(msgID);
						msg.read(in);
						if (trace)
							log.trace("Read new msg: " + msg);

						// Handle the message
						if (pool == null)
							break;
						msg.setHandler(this);
						pool.execute(msg);
					} else {
						if (trace)
							log.trace("Found replyMap msg: " + msg);
						msg.setMsgID(msgID);
						try {
							msg.read(in);
							if (trace)
								log.trace("Read msg reply: " + msg);
						} catch (Throwable e) {
							// Forward the error to the waiting message
							msg.setError(e);
							throw e;
						}
						// Always notify the waiting message
						finally {
							synchronized (msg) {
								msg.notify();
							}
						}
					}
				} catch (ClassNotFoundException e) {
					handleStop("Failed to read msgType:" + msgType, e);
					break;
				} catch (IOException e) {
					handleStop("Exiting on IOE", e);
					break;
				} catch (InterruptedException e) {
					handleStop("Exiting on interrupt", e);
					break;
				} catch (Throwable e) {
					handleStop("Exiting on unexpected error in read task", e);
					break;
				}
			}
			log.debug("End ReadTask.run");
		}

		/**
		 * Handle the message or respond with an error
		 */
		public void handleMsg(BaseMsg msg) {
			try {
				handler.handleMsg(msg);
			} catch (Throwable e) {
				if (e instanceof JMSException)
					log.trace("Failed to handle: " + msg.toString(), e);
				else if (e instanceof RuntimeException || e instanceof Error)
					log.error("Failed to handle: " + msg.toString(), e);
				else
					log.debug("Failed to handle: " + msg.toString(), e);
				msg.setError(e);
				try {
					internalSendMessage(msg, false);
				} catch (Exception ie) {
					log.debug("Failed to send error reply", ie);
				}
			}
		}

		/**
		 * Stop the read thread
		 */
		private void handleStop(String error, Throwable e) {
			synchronized (running) {
				readState = STOPPING;
				running.set(false);
			}

			if (e instanceof IOException || e instanceof InterruptedException) {
				if (trace)
					log.trace(error, e);
			} else
				log.debug(error, e);

			replyAll(e);
			if (handler != null) {
				handler.asynchFailure(error, e);
				handler.close();
			}

			synchronized (running) {
				readState = STOPPED;
				if (writeState == STARTED) {
					writeState = STOPPING;
					writeThread.interrupt();
					//writeThread.stop();
				}
			}

			try {
				in.close();
			} catch (Exception ignored) {
				if (trace)
					log.trace(ignored.getMessage(), ignored);
			}

			try {
				socket.close();
			} catch (Exception ignored) {
				if (trace)
					log.trace(ignored.getMessage(), ignored);
			}
		}

		private void replyAll(Throwable e) {
			// Clear the interrupted state of the thread
			Thread.interrupted();

			for (Iterator iterator = replyMap.keySet().iterator(); iterator
					.hasNext();) {
				BaseMsg msg = (BaseMsg) iterator.next();
				msg.setError(e);
				synchronized (msg) {
					msg.notify();
				}
				iterator.remove();
			}
		}
	}

	/**
	 * The task managing the socket write thread
	 * 
	 */
	public class WriteTask implements Runnable {
		public void run() {
			log.debug("Begin WriteTask.run");
			try {
				bufferedOutput = new NotifyingBufferedOutputStream(
						socket.getOutputStream(), bufferSize, chunkSize,
						handler);
				out = new ObjectOutputStream(bufferedOutput);
				log.debug("Created ObjectOutputStream");
			} catch (IOException e) {
				handleStop(null, "Failed to create ObjectOutputStream", e);
				return;
			}
			log.info("----------------Sujit STARTED writeState : " + STARTED+"writeState"+writeState);
			while (true) {
				BaseMsg msg = null;

				synchronized (running) {
					if (writeState != STARTED)
						break;
				}
				try {
					
					msg = (BaseMsg) sendQueue.poll(10000l);
					if (msg == null)
						continue; // Check for stop if no message for 10 seconds
					if (trace)
						log.info("----------------Sujit Write msg: " + msg);
					msg.write(out);
					out.reset();
					out.flush();
				} catch (InterruptedException e) {
					handleStop(msg, "WriteTask was interrupted", e);
					break;
				} catch (IOException e) {
					handleStop(msg, "Exiting on IOE", e);
					break;
				} catch (Throwable e) {
					handleStop(msg, "Failed to write msgType:" + msg, e);
					break;
				}
			}
			log.debug("End WriteTask.run");
		}

		/**
		 * Stop the write thread
		 */
		private void handleStop(BaseMsg msg, String error, Throwable e) {
			synchronized (running) {
				writeState = STOPPING;
				running.set(false);
			}

			if (e instanceof InterruptedException || e instanceof IOException) {
				if (trace)
					log.trace(error, e);
			} else
				log.debug(error, e);

			if (msg != null) {
				msg.setError(e);
				synchronized (msg) {
					msg.notify();
				}
			}

			synchronized (running) {
				writeState = STOPPED;
				if (readState == STARTED) {	
					readState = STOPPING;
					readThread.interrupt();
					//readThread.stop();
				}
			}

			try {
				out.close();
				log.info("-------sujit out closed--------");
			} catch (Exception ignored) {
				if (trace)
					log.trace(ignored.getMessage(), ignored);
			}

			try {
				socket.close();
				log.info("-------sujit socket closed--------");
			} catch (Exception ignored) {
				if (trace)
					log.trace(ignored.getMessage(), ignored);
			}
		}
	}

	static class UILThreadFactory implements ThreadFactory {
		private String id;
		private int count;

		UILThreadFactory(String id) {
			this.id = id;
		}

		public Thread newThread(Runnable command) {
			synchronized (this) {
				count++;
			}
			Thread t = new Thread(command, "UIL2(" + id + ")#" + count);
			return t;
		}
	}
}

                
> JBossMQ UIL2 Read/Write Tasks should not be doing initialization
> ----------------------------------------------------------------
>
>                 Key: JBAS-4526
>                 URL: https://issues.jboss.org/browse/JBAS-4526
>             Project: Application Server 3  4  5 and 6
>          Issue Type: Task
>      Security Level: Public(Everyone can see) 
>          Components: JMS (JBossMQ)
>    Affects Versions: JBossAS-4.2.1.GA
>            Reporter: Adrian Brock
>            Assignee: Adrian Brock
>             Fix For: JBossAS-4.2.2.GA, JBossAS-4.4.0.CR1, JBossAS-5.0.0.CR1
>
>
> The JBossMQ UIL2 Read/Write tasks both do their stream initialization when each thread starts.
> This code needs changing such that the streams are allocated before starting the threads.
> Only if all resources can be succesfully allocated should we start the threads.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.jboss.org/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        


More information about the jboss-jira mailing list