[jboss-svn-commits] JBL Code SVN: r11099 - labs/jbossesb/workspace/dmarchant/4.2m2/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Wed Apr 18 12:57:30 EDT 2007


Author: driedtoast
Date: 2007-04-18 12:57:30 -0400 (Wed, 18 Apr 2007)
New Revision: 11099

Added:
   labs/jbossesb/workspace/dmarchant/4.2m2/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/AbstractListener.java
   labs/jbossesb/workspace/dmarchant/4.2m2/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/HibernateGatewayListener.java
Log:


Added: labs/jbossesb/workspace/dmarchant/4.2m2/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/AbstractListener.java
===================================================================
--- labs/jbossesb/workspace/dmarchant/4.2m2/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/AbstractListener.java	                        (rev 0)
+++ labs/jbossesb/workspace/dmarchant/4.2m2/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/AbstractListener.java	2007-04-18 16:57:30 UTC (rev 11099)
@@ -0,0 +1,236 @@
+package org.jboss.soa.esb.listeners.gateway;
+
+import java.lang.reflect.Method;
+import java.util.Collection;
+
+import org.apache.log4j.Logger;
+import org.jboss.soa.esb.ConfigurationException;
+import org.jboss.soa.esb.addressing.EPR;
+import org.jboss.soa.esb.addressing.MalformedEPRException;
+import org.jboss.soa.esb.couriers.Courier;
+import org.jboss.soa.esb.couriers.CourierException;
+import org.jboss.soa.esb.couriers.CourierFactory;
+import org.jboss.soa.esb.couriers.CourierUtil;
+import org.jboss.soa.esb.helpers.ConfigTree;
+import org.jboss.soa.esb.listeners.ListenerTagNames;
+import org.jboss.soa.esb.listeners.ListenerUtil;
+import org.jboss.soa.esb.listeners.RegistryUtil;
+import org.jboss.soa.esb.listeners.lifecycle.AbstractThreadedManagedLifecycle;
+import org.jboss.soa.esb.listeners.lifecycle.ManagedLifecycleException;
+import org.jboss.soa.esb.listeners.message.MessageComposer;
+import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.services.registry.RegistryException;
+
+/**
+ * Used to provide a base for creating listeners for the bus.
+ * Takes care of the composer, run and initialization methods
+ * 
+ * @author danielmarchant
+ *
+ */
+public abstract class AbstractListener extends AbstractThreadedManagedLifecycle{
+
+	protected AbstractListener(ConfigTree config) throws ConfigurationException {
+		super(config);
+		initListener(config);
+	}
+
+	
+	protected void initListener(ConfigTree config) throws ConfigurationException {
+		
+		setTargetServiceCategory(ListenerUtil.obtainAtt(config,
+				ListenerTagNames.TARGET_SERVICE_CATEGORY_TAG, null));
+		setTargetServiceName(ListenerUtil.obtainAtt(config,
+			ListenerTagNames.TARGET_SERVICE_NAME_TAG, null));
+		
+		createComposer(config);
+		
+	}
+	
+	protected void createComposer(ConfigTree config) throws ConfigurationException
+	{
+		try
+		{
+            String sProcessMethod = null;
+            _composerName = config.getAttribute(ListenerTagNames.GATEWAY_COMPOSER_CLASS_TAG);
+            if (null != _composerName)
+            { // class attribute
+                _composerClass = Class.forName(_composerName);
+                _composer = (MessageComposer)_composerClass.newInstance();
+                sProcessMethod = config.getAttribute(ListenerTagNames.GATEWAY_COMPOSER_METHOD_TAG, "compose");
+            }
+			else
+			{
+				sProcessMethod = createDefaultComposer(config);
+				_logger
+						.debug("No <" + ListenerTagNames.ACTION_ELEMENT_TAG + "> element found in configuration" + " -  Using default composer class : " + _composerName);
+			}
+	
+			_processMethod = _composerClass.getMethod(sProcessMethod,
+					new Class[] { Object.class });
+		}
+		catch (Exception ex)
+		{
+			throw new ConfigurationException(ex);
+		}
+	} // ________________________________
+	
+	/**
+	 * Creates a default message composer and returns the default method name.
+	 * 
+	 * @param config
+	 * @return
+	 */
+	protected abstract String createDefaultComposer(ConfigTree config);
+	
+	
+	protected abstract Object doPreSend();
+	protected void doPostSend(Message msg) {
+		// do nothing
+	}
+	
+	@Override
+	protected void doRun() {
+		if (_logger.isDebugEnabled())
+        {
+            _logger.debug("run() method of " + this.getClass().getSimpleName() +
+                " started on thread " + Thread.currentThread().getName());
+        }
+		do {
+			
+			try {
+				
+				Object objToConvert = doPreSend();
+				Object obj = _processMethod.invoke(_composer,
+                         new Object[]{objToConvert});
+                 if (null == obj) {
+                     _logger.warn("Action class method <" + _processMethod
+                             .getName() + "> returned a null object");
+                     continue;
+                 }
+                 
+					boolean bSent = false;
+					try {
+					   
+	                   for (EPR current : _targetEprs) {
+	                       Courier _courier;
+	                       Message msg = (Message) obj;
+							_courier = CourierFactory.getCourier(current);
+						   try {
+	                           if (bSent = _courier
+	                                   .deliver((Message) obj)) {
+	                               break;
+	                           }
+	                       }
+	                       finally {
+	                           CourierUtil.cleanCourier(_courier);
+	                           doPostSend(msg);
+	                       }
+	                   }
+					} catch (CourierException e) {
+						_logger.error(" Courier is not found for target ", e);
+					} catch (MalformedEPRException e) {
+						_logger.error("Problems with the EPR  ", e);
+					}
+                    
+                   if (!bSent) {
+                       String text = "Target service <" + _targetServiceCategory + "," + _targetServiceName + "> is not registered";
+                       throw new Exception(text);
+                   }
+			
+			} catch (Exception e) {
+				_logger.error("Unexpected error has occurred",e);
+			}
+		} while (isRunning());
+	}
+	
+	
+	
+
+	
+	
+	@Override
+	protected void doInitialise() throws ManagedLifecycleException {
+		try
+        {
+            _targetEprs = RegistryUtil.getEprs(_targetServiceCategory,_targetServiceName);
+            if (null == _targetEprs || _targetEprs.size() < 1)
+                throw new ManagedLifecycleException("EPR <" + _targetServiceName + "> not found in registry") ;
+        }
+        catch (final RegistryException re)
+        {
+            throw new ManagedLifecycleException("Unexpected registry exception", re) ;
+        }
+
+	}
+	
+	public String getTargetServiceCategory() {
+		return _targetServiceCategory;
+	}
+
+
+	public void setTargetServiceCategory(String serviceCategory) {
+		_targetServiceCategory = serviceCategory;
+	}
+
+
+	public String getTargetServiceName() {
+		return _targetServiceName;
+	}
+
+
+	public void setTargetServiceName(String serviceName) {
+		_targetServiceName = serviceName;
+	}
+	
+	
+	public MessageComposer getComposer() {
+		return _composer;
+	}
+
+
+	public void setComposer(MessageComposer _composer) {
+		this._composer = _composer;
+	}
+
+
+	public Class getComposerClass() {
+		return _composerClass;
+	}
+
+
+	public void setComposerClass(Class class1) {
+		_composerClass = class1;
+	}
+
+
+	public String getComposerName() {
+		return _composerName;
+	}
+
+
+	public void setComposerName(String name) {
+		_composerName = name;
+	}
+	
+	public Collection<EPR> getTargetEprs() {
+		return _targetEprs;
+	}
+
+	
+	public void setTargetEprs(Collection<EPR> eprs) {
+		_targetEprs = eprs;
+	}
+	
+	
+	protected Collection<EPR> _targetEprs;
+	protected String _targetServiceCategory, _targetServiceName;
+	protected String _composerName;
+	protected MessageComposer _composer;
+	protected Class _composerClass;
+	protected Method _processMethod;
+	private static final Logger _logger = Logger.getLogger(AbstractListener.class);
+
+
+	
+}

Added: labs/jbossesb/workspace/dmarchant/4.2m2/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/HibernateGatewayListener.java
===================================================================
--- labs/jbossesb/workspace/dmarchant/4.2m2/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/HibernateGatewayListener.java	                        (rev 0)
+++ labs/jbossesb/workspace/dmarchant/4.2m2/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/HibernateGatewayListener.java	2007-04-18 16:57:30 UTC (rev 11099)
@@ -0,0 +1,256 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.soa.esb.listeners.gateway;
+
+import java.io.Serializable;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.log4j.Logger;
+import org.hibernate.HibernateException;
+import org.hibernate.cfg.Configuration;
+import org.hibernate.event.AbstractEvent;
+import org.hibernate.event.DeleteEvent;
+import org.hibernate.event.DeleteEventListener;
+import org.hibernate.event.EventListeners;
+import org.hibernate.event.EventSource;
+import org.hibernate.event.LoadEvent;
+import org.hibernate.event.LoadEventListener;
+import org.hibernate.event.SaveOrUpdateEvent;
+import org.hibernate.event.SaveOrUpdateEventListener;
+import org.jboss.soa.esb.ConfigurationException;
+import org.jboss.soa.esb.helpers.ConfigTree;
+import org.jboss.soa.esb.listeners.message.AbstractMessageComposer;
+import org.jboss.soa.esb.listeners.message.MessageDeliverException;
+import org.jboss.soa.esb.message.Message;
+
+/**
+ * Listens for events in hibernate and adds a message to the bus 
+ * for processing.
+ * 
+ * 
+ * @author danielmarchant
+ *
+ */
+public class HibernateGatewayListener extends AbstractListener 
+					implements DeleteEventListener, 
+								SaveOrUpdateEventListener,
+								LoadEventListener {
+
+	
+	
+	
+	protected HibernateGatewayListener(ConfigTree config) throws ConfigurationException {
+		super(config);
+	}
+
+	
+	@Override
+	protected String createDefaultComposer(ConfigTree config) {
+		_composerName = HibernateMessageComposer.class.getName();
+		_composerClass = HibernateMessageComposer.class;
+		_composer = new HibernateMessageComposer();
+		return  "compose";
+	}
+
+
+
+
+
+	@Override
+	protected Object doPreSend() {
+		try {
+			return queue.take();
+		} catch (InterruptedException e) {
+			_logger.warn(" interrupted while getting event off queue " , e);
+			return null;
+		}
+	}
+	
+	
+	
+	
+	@Override
+	protected void initListener(ConfigTree config) throws ConfigurationException {
+		// setup gateway as a listener to hibernate
+		Configuration cfg = new Configuration();
+		EventListeners listeners = cfg.getEventListeners();
+		
+		
+		// add as delete listener
+		DeleteEventListener[] dlisteners = listeners.getDeleteEventListeners();
+		if(dlisteners == null) {
+			dlisteners = new DeleteEventListener[1];
+		} else {
+			DeleteEventListener[] dlistenersTemp = new DeleteEventListener[dlisteners.length +1 ];
+			System.arraycopy(dlisteners, 0, dlistenersTemp, 0, dlisteners.length);
+			dlisteners = dlistenersTemp;
+		}
+		dlisteners[dlisteners.length-1] = this;
+		listeners.setDeleteEventListeners(dlisteners);
+		
+		// add save or update event
+		SaveOrUpdateEventListener[] slisteners = listeners.getSaveEventListeners();
+		if(slisteners == null) {
+			slisteners = new SaveOrUpdateEventListener[1];
+		} else {
+			SaveOrUpdateEventListener[] slistenersTemp = new SaveOrUpdateEventListener[slisteners.length +1 ];
+			System.arraycopy(dlisteners, 0, slistenersTemp, 0, slisteners.length);
+			slisteners = slistenersTemp;
+		}
+		slisteners[slisteners.length-1] = this;
+		listeners.setSaveEventListeners(slisteners);
+		
+		// add load event
+		LoadEventListener[] llisteners = listeners.getLoadEventListeners();
+		if(llisteners == null) {
+			llisteners = new LoadEventListener[1];
+		} else {
+			LoadEventListener[] llistenersTemp = new LoadEventListener[llisteners.length +1 ];
+			System.arraycopy(llisteners, 0, llistenersTemp, 0, llisteners.length);
+			llisteners = llistenersTemp;
+		}
+		llisteners[llisteners.length-1] = this;
+		listeners.setLoadEventListeners(llisteners);
+		
+
+		// TODO post listeners
+		// TODO pre  listeners
+	}
+     
+	
+	
+
+	
+	
+	
+	protected Logger _logger = Logger.getLogger(HibernateGatewayListener.class);
+	protected BlockingQueue<AbstractEvent> queue = new LinkedBlockingQueue<AbstractEvent>();
+	
+	// Hibernate based event methods......
+	
+	
+	public void onDelete(DeleteEvent event) throws HibernateException {
+		if(_logger.isDebugEnabled()) {
+			_logger.debug(" Receiving delete event: " + event);
+		}
+		try {
+			queue.put(event);
+		} catch (InterruptedException e) {
+			_logger.info(" Event was interrupted onDelete on adding to queue",e);
+		}
+	}
+
+	public void onSaveOrUpdate(SaveOrUpdateEvent event) throws HibernateException {
+		if(_logger.isDebugEnabled()) {
+			_logger.debug(" Receiving save or update event: " + event);
+		}
+		try {
+			queue.put(event);
+		} catch (InterruptedException e) {
+			_logger.info(" Event was interrupted onSaveOrUpdate on adding to queue",e);
+		}
+	}
+
+	public void onLoad(LoadEvent event, LoadType type) throws HibernateException {
+		// TODO do something with the type : type.getName()
+		if(_logger.isDebugEnabled()) {
+			_logger.debug(" Receiving load event: " + event);
+		}
+		try {
+			queue.put(event);
+		} catch (InterruptedException e) {
+			_logger.info(" Event was interrupted onLoad on adding to queue",e);
+		}
+		
+	}
+	
+	
+	
+	
+	
+	/// hibernate message composer
+	public static class HibernateMessageComposer extends AbstractMessageComposer {
+		
+		@Override
+		protected void populateMessage(Message message, Object obj) throws MessageDeliverException {
+			Object instanceObj = null;
+			String entityName = null;
+			Serializable entityId = null;
+			Object result = null;
+			EventSource source = null;
+			Serializable requestId = null;
+			String type = null;
+			
+			
+			if(obj instanceof DeleteEvent) {
+				DeleteEvent devent = (DeleteEvent)obj;
+				entityName =  devent.getEntityName();
+				instanceObj  = devent.getObject();
+				source = devent.getSession();
+				type = "delete";
+			} else if (obj instanceof SaveOrUpdateEvent) {
+				SaveOrUpdateEvent sevent = (SaveOrUpdateEvent) obj;
+				entityName = sevent.getEntityName();
+				instanceObj = sevent.getObject();
+				source = sevent.getSession();
+				entityId = sevent.getResultId();
+				requestId = sevent.getRequestedId();
+				type = "saveOrUpdate";
+			} else if (obj instanceof LoadEvent) {
+				LoadEvent levent = (LoadEvent)obj;
+				instanceObj = levent.getInstanceToLoad();
+				entityName = levent.getEntityClassName();
+				entityId = levent.getEntityId();
+				result = levent.getResult();
+				source = levent.getSession();
+				type = "load";
+			}
+			
+
+
+			if(source != null) 
+				message.getProperties().setProperty("eventSource",source);
+			if(instanceObj != null) 
+				message.getProperties().setProperty("objectInstance",instanceObj);
+			if(entityId != null) 
+				message.getProperties().setProperty("entityId",entityId);
+			if(requestId != null) 
+					message.getProperties().setProperty("requestId",requestId);
+			if(entityName != null) 
+				message.getProperties().setProperty("entityName",entityName);
+			if(type != null) 
+				message.getProperties().setProperty("type",type);
+		}
+
+		public void setConfiguration(ConfigTree config) {
+		}
+		
+	}
+
+
+
+
+
+	
+	
+}




More information about the jboss-svn-commits mailing list