JavaXT
|
|
ThreadPool Classpackage javaxt.utils; import java.util.*; import java.util.concurrent.ConcurrentHashMap; //****************************************************************************** //** ThreadPool //****************************************************************************** /** * Used to spawn threads and execute tasks. Instances of this class should * override the process() method which is called by individual threads. * Objects (e.g. data) are added to a queue via the add() method. Individual * threads wait for data to be added to the queue. As new data is added, * it is assigned to a thread to process. The thread, in turn, calls the * process() method. The ThreadPool will run indefinitely, unless the done() * method is called. The join() method can be used to join the ThreadPool * to the caller's thread. Example: * <pre> //Instantiate thread pool int numThreads = 2; ThreadPool pool = new ThreadPool(numThreads){ public void process(Object obj){ //Do something! } }; //Start the thread pool.start(); //Add tiles to the pool for (int i : new int[]{1,2,3,4,5,6,7,8,9,10}){ pool.add(i); } //Notify the pool that we have finished added records pool.done(); //Wait for threads to finish pool.join(); </pre> * ******************************************************************************/ public class ThreadPool { private int numThreads; private Integer maxPoolSize; private final ArrayList<Thread> threads; private final ArrayList<Thread> activeThreads; private final List pool; private class Return{} private ConcurrentHashMap<Long, HashMap<String, Object>> params; //************************************************************************** //** Constructor //************************************************************************** public ThreadPool(int numThreads, Integer maxPoolSize){ this.numThreads = numThreads; threads = new ArrayList<>(); activeThreads = new ArrayList<>(); pool = new LinkedList(); if (maxPoolSize!=null){ if (maxPoolSize<1) maxPoolSize = null; } this.maxPoolSize = maxPoolSize; params = new ConcurrentHashMap<>(); } //************************************************************************** //** Constructor //************************************************************************** public ThreadPool(int numThreads){ this(numThreads, null); } //************************************************************************** //** start //************************************************************************** /** Used to initialize threads in the pool */ public ThreadPool start(){ threads.clear(); for (int i=0; i<numThreads; i++){ Thread t = new Thread(){ public void run(){ while (true) { Object obj; synchronized (pool) { while (pool.isEmpty()) { try { pool.wait(); } catch (InterruptedException e) { return; } } obj = pool.remove(0); pool.notifyAll(); } if ((obj instanceof Return)){ //Remove thread from the activeThreads array synchronized(activeThreads){ for (int i=0; i<activeThreads.size(); i++){ if (activeThreads.get(i).getId()==this.getId()){ activeThreads.remove(i); break; } } //Add the object back to the pool for other threads to process if (!activeThreads.isEmpty()) add(obj); activeThreads.notify(); } //Call the exit() callback and return exit(); return; } else{ try{ //Call the process() callback process(obj); } catch(Exception e){ //Remove thread from the activeThreads array synchronized(activeThreads){ for (int i=0; i<activeThreads.size(); i++){ if (activeThreads.get(i).getId()==this.getId()){ activeThreads.remove(i); break; } } activeThreads.notify(); } //Throw exception throw e; } } } } }; synchronized(activeThreads){ activeThreads.add(t); activeThreads.notify(); } t.start(); threads.add(t); } return this; } //************************************************************************** //** process //************************************************************************** /** Called whenever a thread gets an object to process */ public void process(Object obj){} //************************************************************************** //** exit //************************************************************************** /** Called when a thread is being disposed */ public void exit(){} //************************************************************************** //** get //************************************************************************** /** Returns a variable for an individual thread */ public Object get(String key){ return get(key, null); } //************************************************************************** //** get //************************************************************************** /** Returns a variable associated with an individual thread * @param key The name of the variable * @param setter Optional. Used to generate a value if the value has not * been set. The following example shows how to call the get method with * a setter as a lamba expression to return a new database connection: <pre> Connection conn = (Connection) get("conn", () -> { return database.getConnection(); }); </pre> */ public Object get(String key, Setter setter) { long id = getThreadID(); Object val = null; synchronized(params){ HashMap<String, Object> map = params.get(id); if (map==null && setter!=null){ map = new HashMap<>(); params.put(id, map); } if (map!=null){ val = map.get(key); if (val==null && setter!=null){ try{ val = setter.getValue(); if (val==null) throw new Exception("Setter cannot return a null value"); map.put(key, val); } catch(Exception e){ throw new RuntimeException(e); } } } params.notifyAll(); } return val; } //************************************************************************** //** set //************************************************************************** /** Used to set a variable for an individual thread */ public void set(String key, Object value){ long id = getThreadID(); synchronized(params){ HashMap<String, Object> map = params.get(id); if (map==null){ map = new HashMap<>(); params.put(id, map); } map.put(key, value); params.notifyAll(); } } //************************************************************************** //** getThreadID //************************************************************************** private long getThreadID(){ long id = Thread.currentThread().getId(); for (Thread t : threads){ if (id==t.getId()){ return id; } } ThreadGroup threadGroup = Thread.currentThread().getThreadGroup(); for (Thread t : threads){ if (t.getThreadGroup().parentOf(threadGroup)){ return t.getId(); } } //ThreadGroup parentGroup = threadGroup.getParent(); throw new RuntimeException("Thread not found"); } //************************************************************************** //** Setter Interface //************************************************************************** public static interface Setter { public Object getValue() throws Exception; } //************************************************************************** //** add //************************************************************************** /** Used to add an object to the pool to process * @return Integer representing the position in the queue */ public int add(Object object){ int idx; synchronized (pool) { if (maxPoolSize!=null){ while (pool.size()>maxPoolSize){ try{ pool.wait(); } catch(java.lang.InterruptedException e){ break; } } } idx = pool.size(); pool.add(object); pool.notify(); } return idx; } //************************************************************************** //** getActiveThreadCount //************************************************************************** /** Returns the number of active threads in the pool */ public int getActiveThreadCount(){ int count; synchronized(activeThreads){ count = activeThreads.size(); } return count; } //************************************************************************** //** getQueue //************************************************************************** /** Returns a handle to the job queue. Use with care. Be sure to add a * synchronized block when processing or iterating through items in the * queue. */ public List getQueue(){ return pool; } //************************************************************************** //** done //************************************************************************** /** Used to notify the threads that we are done adding objects to the pool * and to exit when ready */ public void done(){ add(new Return()); } //************************************************************************** //** join //************************************************************************** /** Used to wait for threads to complete */ public void join() throws InterruptedException { for (Thread thread : threads){ thread.join(); } } } |