1. <ul id="0c1fb"></ul>

      <noscript id="0c1fb"><video id="0c1fb"></video></noscript>
      <noscript id="0c1fb"><listing id="0c1fb"><thead id="0c1fb"></thead></listing></noscript>

      99热在线精品一区二区三区_国产伦精品一区二区三区女破破_亚洲一区二区三区无码_精品国产欧美日韩另类一区

      RELATEED CONSULTING
      相關咨詢
      選擇下列產(chǎn)品馬上在線溝通
      服務時間:8:30-17:00
      你可能遇到了下面的問題
      關閉右側(cè)工具欄

      新聞中心

      這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
      Java線程池架構(gòu)中的多線程調(diào)度器是什么

      本篇文章為大家展示了Java線程池架構(gòu)中的多線程調(diào)度器是什么,內(nèi)容簡明扼要并且容易理解,絕對能使你眼前一亮,通過這篇文章的詳細介紹希望你能有所收獲。

      昌平網(wǎng)站建設公司創(chuàng)新互聯(lián)建站,昌平網(wǎng)站設計制作,有大型網(wǎng)站制作公司豐富經(jīng)驗。已為昌平超過千家提供企業(yè)網(wǎng)站建設服務。企業(yè)網(wǎng)站搭建\外貿(mào)網(wǎng)站建設要多少錢,請找那個售后服務好的昌平做網(wǎng)站的公司定做!

      我們?nèi)绻胘ava默認的線程池來做調(diào)度器,一種選擇就是Timer和TimerTask的結(jié)合,在以前的文章:《Timer與 TimerTask的真正原理&使用介紹》中有明確的說明:一個Timer為一個單獨的線程,雖然一個Timer可以調(diào)度多個 TimerTask,但是對于一個Timer來講是串行的,至于細節(jié)請參看對應的那篇文章的內(nèi)容,本文介紹的多線程調(diào)度器,也就是定時任務,基于多線程調(diào) 度完成,當然你可以為了完成多線程使用多個Timer,只是這些Timer的管理需要你來完成,不是一個框架體系,而 ScheduleThreadPoolExecutor提供了這個功能,所以我們第一要搞清楚是如何使用調(diào)度器的,其次是需要知道它的內(nèi)部原理是什么,也 就是知其然,再知其所以然!

      首先如果我們要創(chuàng)建一個基于java本身的調(diào)度池通常的方法是:

      Executors.newScheduledThreadPool(int); 當有重載方法,我們最常用的是這個就從這個,看下定義:public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {     return new ScheduledThreadPoolExecutor(corePoolSize); }

      其實內(nèi)部是new了一個實例化對象出來,并傳入大小,此時就跟蹤到ScheduledThreadPoolExecutor的構(gòu)造方法中:

      public ScheduledThreadPoolExecutor(int corePoolSize) {         super(corePoolSize, Integer.MAX_VALUE, 0,TimeUnit.NANOSECONDS,               new DelayedWorkQueue());  }

      你會發(fā)現(xiàn)調(diào)用了super,而super你跟蹤進去會發(fā)現(xiàn),是ThreadPoolExecutor中,那么 ScheduledThreadPoolExecutor和ThreadPoolExecutor有何區(qū)別,就是本文要說得重點了,首先我們留下個引子, 你發(fā)現(xiàn)在定義隊列的時候,不再是上文中提到的LinkedBlockingQueue,而是DelayedWorkQueue,那么細節(jié)上我們接下來就是 要講解的重點,既然他們又繼承關系,其實搞懂了不同點,就搞懂了共同點,而且有這樣的關系大多數(shù)應當是共同點,不同點的猜測:這個是要實現(xiàn)任務調(diào)度,任務 調(diào)度不是立即的,需要延遲和定期做等情況,那么是如何實現(xiàn)的呢?

      這就是我們需要思考的了,通過源碼考察,我們發(fā)現(xiàn),他們都有execute方法,只是ScheduledThreadPoolExecutor將源碼進行了重寫,并且還有以下四個調(diào)度器的方法:

      public ScheduledFuture schedule(Runnable command,                        long delay, TimeUnit unit);  public  ScheduledFuture schedule(Callable callable,                        long delay, TimeUnit unit);  public ScheduledFuture scheduleAtFixedRate(Runnable command,                           long initialDelay,                           long period,                           TimeUnit unit);  public ScheduledFuture scheduleWithFixedDelay(Runnable command,                              long initialDelay,                              long delay,                              TimeUnit unit);

      那么這四個方法有什么區(qū)別呢?其實第一個和第二個區(qū)別不大,一個是Runnable、一個是Callable,內(nèi)部包裝后是一樣的效果;所以把頭兩個方法幾乎當成一種調(diào)度,那么三種情況分別是:

      1、 進行一次延遲調(diào)度:延遲delay這么長時間,單位為:TimeUnit傳入的的一個基本單位,例如:TimeUnit.SECONDS屬于提供好的枚舉信息;(適合于方法1和方法2)。

      2、  多次調(diào)度,每次依照上一次預計調(diào)度時間進行調(diào)度,例如:延遲2s開始,5s一次,那么就是2、7、12、17,如果中間由于某種原因?qū)е戮€程不夠用,沒有 得到調(diào)度機會,那么接下來計算的時間會優(yōu)先計算進去,因為他的排序會被排在前面,有點類似Timer中的:scheduleAtFixedRate方法, 只是這里是多線程的,它的方法名也叫:scheduleAtFixedRate,所以這個是比較好記憶的(適合方法3)

      3、 多次調(diào)度,每次按照上一次實際執(zhí)行的時間進行計算下一次時間,同上,如果在第7秒沒有被得到調(diào)度,而是第9s才得到調(diào)度,那么計算下一次調(diào)度時間就不是12秒,而是9+5=14s,如果再次延遲,就會延遲一個周期以上,也就會出現(xiàn)少調(diào)用的情況(適合于方法3);

      4、 最后補充execute方法是一次調(diào)度,期望被立即調(diào)度,時間為空:

      public void execute(Runnable command) {  if (command == null)  throw new NullPointerException();  schedule(command, 0, TimeUnit.NANOSECONDS);  }

      我們簡單看看scheduleAtFixedRate、scheduleWithFixedDelay對下面的分析會更加有用途:

      public ScheduledFuture scheduleAtFixedRate(Runnable command,                                                   long initialDelay,                                                   long period,                                                   TimeUnit unit) {         if (command == null || unit == null)             throw new NullPointerException();         if (period <= 0)             throw new IllegalArgumentException();         RunnableScheduledFuture t = decorateTask(command,             new ScheduledFutureTask
           兩段源碼唯一的區(qū)別就是在unit.toNanos(int)這唯一一個地方,scheduleAtFixedRate里面是直接傳入值,而scheduleWithFixedDelay里面是取了相反數(shù),也就是假如我們都傳入正數(shù),scheduleWithFixedDelay其實就取反了,沒有任何區(qū)別,你是否聯(lián)想到前面文章介紹Timer中類似的處理手段通過正負數(shù)區(qū)分時間間隔方法,為0代表僅僅調(diào)度一次,其實在這里同樣是這樣的,他們也同樣有一個問題就是,如果你傳遞負數(shù),方法的功能正好是相反的。 而你會發(fā)現(xiàn),不論是那個schedule方法里頭,都會創(chuàng)建一個ScheduledFutureTask類的實例,此類究竟是何方神圣呢,我們來看看。  ScheduledFutureTask的類(ScheduleThreadPoolExecutor的私有的內(nèi)部類)來進行調(diào)度,那么可以看看內(nèi)部做了什么操作,如下:  1         ScheduledFutureTask(Runnable r, V result, long ns) {             super(r, result);             this.time = ns;             this.period = 0;             this.sequenceNumber = sequencer.getAndIncrement();         }          /**          * Creates a periodic action with given nano time and period.          */         ScheduledFutureTask(Runnable r, V result, long ns, long period) {             super(r, result);             this.time = ns;             this.period = period;             this.sequenceNumber = sequencer.getAndIncrement();         }          /**          * Creates a one-shot action with given nanoTime-based trigger.          */         ScheduledFutureTask(Callable callable, long ns) {             super(callable);             this.time = ns;             this.period = 0;             this.sequenceNumber = sequencer.getAndIncrement();         }

      最核心的幾個參數(shù)正好對應了調(diào)度的延遲的構(gòu)造方法,這些參數(shù)如何用起來的?那么它還提供了什么方法呢?

      public long getDelay(TimeUnit unit) {     return unit.convert(time - now(), TimeUnit.NANOSECONDS); }  public int compareTo(Delayed other) {     if (other == this) // compare zero ONLY if same object         return 0;     if (other instanceof ScheduledFutureTask) {         ScheduledFutureTask x = (ScheduledFutureTask)other;         long diff = time - x.time;         if (diff < 0)                     return -1;                 else if (diff > 0)             return 1;         else if (sequenceNumber < x.sequenceNumber)             return -1;         else             return 1;     }     long d = (getDelay(TimeUnit.NANOSECONDS) -               other.getDelay(TimeUnit.NANOSECONDS));     return (d == 0)? 0 : ((d < 0)? -1 : 1);         }         /**          * 返回是否為片段,也就是多次調(diào)度          *          */         public boolean isPeriodic() {             return period != 0;         }

      這里發(fā)現(xiàn)了,他們可以運行,且判定時間的方法是getDelay方法我們知道了。  對比時間的方法是:compareTo,傳入了參數(shù)類型為:Delayed類型,不難猜測出,ScheduledFutureTask和Delayed有 某種繼承關系,沒錯,ScheduledFutureTask實現(xiàn)了Delayed的接口,只是它是間接實現(xiàn)的;并且Delayed接口繼承了 Comparable接口,這個接口可用來干什么?看過我前面寫的一篇文章關于中文和對象排序的應該知道,這個是用來自定義對比和排序的,我們的調(diào)度任務 是一個對象,所以需要排序才行,接下來我們回溯到開始定義的代碼中,找一個實際調(diào)用的代碼來看看它是如何啟動到run方法的?如何排序的?如何調(diào)用延遲 的?就是我們下文中會提到的,而這里我們先提出問題,后文我們再來說明這些問題。 我們先來看下run方法的一些定義。

         /**            * 時間片類型任務執(zhí)行            */          private void runPeriodic() {             //運行對應的程序,這個是具體的程序   boolean ok = ScheduledFutureTask.super.runAndReset();             boolean down = isShutdown();             // Reschedule if not cancelled and not shutdown or policy allows             if (ok && (!down ||(getContinueExistingPeriodicTasksAfterShutdownPolicy() &&!isStopped()))) {        long p = period;                      if (p > 0)//規(guī)定時間間隔算出下一次時間             time += p;         else//用當前時間算出下一次時間,負負得正             time = triggerTime(-p);         //計算下一次時間,并資深再次放入等待隊列中         ScheduledThreadPoolExecutor.super.getQueue().add(this);     }     else if (down)         interruptIdleWorkers(); }  /**  * 是否為逐片段執(zhí)行,如果不是,則調(diào)用父親類的run方法  */ public void run() {     if (isPeriodic())//周期任務         runPeriodic();     else//只執(zhí)行一次的任務         ScheduledFutureTask.super.run(); }

      可以看到run方法首先通過isPeriod()判定是否為時間片,判定的依據(jù)就是我們說的時間片是否“不為零”,如果不是周期任務,就直接運行一 次,如果是周期任務,則除了運行還會計算下一次執(zhí)行的時間,并將其再次放入等待隊列,這里對應到scheduleAtFixedRate、 scheduleWithFixedDelay這兩個方法一正一負,在這里得到判定,并且將為負數(shù)的取反回來,負負得正,java就是這么干的,呵呵,所 以不要認為什么是不可能的,只要好用什么都是可以的,然后計算的時間一個是基于標準的time加上一個時間片,一個是根據(jù)當前時間計算一個時間片,在上文 中我們已經(jīng)明確說明了兩者的區(qū)別。

      以:schedule方法為例:

      public  ScheduledFuture schedule(Callable callable,                                            long delay,                                            TimeUnit unit) {         if (callable == null || unit == null)             throw new NullPointerException();         RunnableScheduledFuture t = decorateTask(callable,             new ScheduledFutureTask(callable,                        triggerTime(delay, unit)));         delayedExecute(t);         return t; }

      其實這個方法內(nèi)部創(chuàng)建的就是一個我們剛才提到的:ScheduledFutureTask,外面又包裝了下叫做RunnableScheduledFuture,也就是適配了下而已,呵呵,代碼里面就是一個return操作,java這樣做的目的是方便子類去擴展。

      關鍵是delayedExecute(t)方法中做了什么?看名稱是延遲執(zhí)行的意思,難道java的線程可以延遲執(zhí)行,那所有的任務線程都在運行狀態(tài)?
      它的源碼是這樣的:

      private void delayedExecute(Runnable command) {     if (isShutdown()) {         reject(command);         return;     }     if (getPoolSize() < getCorePoolSize())         prestartCoreThread();      super.getQueue().add(command); }

      我們主要關心prestartCoreThread()和super.getQueue().add(command),因為如果系統(tǒng)關閉,這些討論都沒有意義的,我們分別叫他們第二小段代碼和第三小段代碼。

      第二個部分如果線程數(shù)小于核心線程數(shù)設置,那么就調(diào)用一個prestartCoreThread(),看方法名應該是:預先啟動一個核心線程的意思,先看完第三個部分,再跟蹤進去看源碼。

      第三個部分很明了,就是調(diào)用super.getQueue().add(command);也就是說直接將任務放入一個隊列中,其實super是什 么?super就是我們上一篇文章所提到的ThreadPoolExecutor,那么這個Queue就是上一篇文章中提到的等待隊列,也就是任何 schedule任務首先放入等待隊列,然后等待被調(diào)度的。

      public boolean prestartCoreThread() {     return addIfUnderCorePoolSize(null); } private boolean addIfUnderCorePoolSize(Runnable firstTask) {     Thread t = null;     final ReentrantLock mainLock = this.mainLock;     mainLock.lock();     try {         if (poolSize < corePoolSize && runState == RUNNING)                              t = addThread(firstTask);                 } finally {                               mainLock.unlock();                         }  if (t == null)                                  return false;                             t.start();                               return true;                  }

      這個代碼是否似曾相似,沒錯,這個你在上一篇文章介紹ThreadPoolExecutor的時候就見到過,說明不論是 ThreadPoolExecutor還是ScheduleThreadPoolExecutor他們的Thread都是由一個Worker來處理的(上 一篇文章有介紹),而這個Worker處理的基本機制就是將當前任務執(zhí)行后,不斷從線程等待隊列中獲取數(shù)據(jù),然后用以執(zhí)行,直到隊列為空為止。  那么他們的區(qū)別在哪里呢?延遲是如何實現(xiàn)的呢?和我們上面介紹的ScheduledFutureTask又有何關系呢?  那么我們回過頭來看看ScheduleThreadPool的定義是如何的。

      public ScheduledThreadPoolExecutor(int corePoolSize) {                     super(corePoolSize, Integer.MAX_VALUE, 0,TimeUnit.NANOSECONDS, new DelayedWorkQueue()); }

      發(fā)現(xiàn)它和ThreadPoolExecutor有個定義上很大的區(qū)別就是,ThreadPoolExecutor用的是 LinkedBlockingQueue(當然可以修改),它用的是DelayedWeorkQueue,而這個DelayedWorkQueue里面你 會發(fā)現(xiàn)它僅僅是對java.util.concurrent.DelayedQueue類一個簡單訪問包裝,這個隊列就是等待隊列,可以看到任務是被直接 放到等待隊列中的,所以取數(shù)據(jù)必然從這里獲取,而這個延遲的隊列有何神奇之處呢,它又是如何實現(xiàn)的呢,我們從什么地方下手去看這個 DelayWorkQueue? 我們還是回頭看看Worker里面的run方法(上一篇文章中已經(jīng)講過):

      public void run() {                             try {                      Runnable task = firstTask;                                      firstTask = null;                                         while (task != null || (task = getTask()) != null) {                               runTask(task);                                                      task = null;                                         }                                   } finally {                                 workerDone(this);                                          }                          }

      這里面要調(diào)用等待隊列就是getTask()方法:

      Runnable getTask() {             for (;;) {                                   try { int state = runState;                                if (state > SHUTDOWN)                    return null;                Runnable r;                if (state == SHUTDOWN)  // Help drain queue                    r = workQueue.poll();                else if (poolSize > corePoolSize || allowCoreThreadTimeOut)                    r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);                else                    r = workQueue.take();                if (r != null)                    return r;                if (workerCanExit()) {                    if (runState >= SHUTDOWN) // Wake up others                        interruptIdleWorkers();                    return null;                }            } catch (InterruptedException ie) {            }        }

      發(fā)現(xiàn)沒有,如果沒有設置超時,默認只會通過workQueue.take()方法獲取數(shù)據(jù),那么我們就看take方法,而增加到隊列里面的方法自然看offer相關的方法。接下來我們來看下DelayQueue這個隊列的take方法:

      public E take() throws InterruptedException {         final ReentrantLock lock = this.lock;         lock.lockInterruptibly();         try {             for (;;) {                 E first = q.peek();                 if (first == null) {                     available.await();//等待信號,線程一直掛在哪里                 } else {                     long delay =  first.getDelay(TimeUnit.NANOSECONDS);                     if (delay > 0) {                         long tl = available.awaitNanos(delay);//最左等delay的時間段                     } else {                         E x = q.poll();//可以運行,取出一個                         assert x != null;                         if (q.size() != 0)                             available.signalAll();                         return x;                      }                 }             }         } finally {             lock.unlock();         } }

      這里的for就是要找到數(shù)據(jù)為止,否則就等著,而這個“q”和“available”是什么呢?

      private transient final Condition available = lock.newCondition();  private final PriorityQueue q = new PriorityQueue();

      怎么里面還有一層隊列,不用怕,從這里你貌似看出點名稱意味了,就是它是優(yōu)先級隊列,而對于任務調(diào)度來講,優(yōu)先級的方式就是時間,我們用這中猜測來繼續(xù)深入源碼。

      上面首先獲取這個隊列的第一個元素,若為空,就等待一個“available”發(fā)出的信號,我們可以猜測到這個offer的時候會發(fā)出的信號,一會 來驗證即可;若不為空,則通過getDelay方法來獲取時間信息,這個getDelay方法就用上了我們開始說的 ScheduledFutureTask了,如果是時間大于0,則也進入等待,因為還沒開始執(zhí)行,等待也是“available”發(fā)出信號,但是有一個最 長時間,為什么還要等這個信號,是因為有可能進來一個新的任務,比這個等待的任務還要先執(zhí)行,所以要等這個信號;而最多等這么長時間,就是因為如果這段時 間沒任務進來肯定就是它執(zhí)行了。然后就返回的這個值,被Worker(上面有提到)拿到后調(diào)用其run()方法進行運行。

      那么寫入隊列在那里?他們是如何排序的?

      我們看看隊列的寫入方法是這樣的:

      public boolean offer(E e) {         final ReentrantLock lock = this.lock;         lock.lock();         try {             E first = q.peek();             q.offer(e);             if (first == null || e.compareTo(first) < 0)                                   available.signalAll();                             return true;                   } finally {   lock.unlock(); }                          }

      隊列也是首先取出第一個(后面會用來和當前任務做比較),而這里“q”是上面提到的“PriorityQueue”,看來offer的關鍵還在它的里面,我們看看調(diào)用過程:

       public boolean offer(E e) {         if (e == null)             throw new NullPointerException();         modCount++;         int i = size;         if (i >= queue.length)             grow(i + 1);         size = i + 1;         if (i == 0)             queue[0] = e;         else             siftUp(i, e);//主要是這條代碼很關鍵         return true; } private void siftUp(int k, E x) {         if (comparator != null)             siftUpUsingComparator(k, x);         else         //我們默認走這里,因為DelayQueue定義它的時候默認沒有給定義comparator             siftUpComparable(k, x); } /* 可以發(fā)現(xiàn)這個方法是將任務按照compareTo對比后,放在隊列的合適位置,但是它肯定不是絕對順序的,這一點和Timer的內(nèi)部排序機制類似。 */ private void siftUpComparable(int k, E x) {         Comparable key = (Comparable) x;         while (k > 0) {             int parent = (k - 1) >>> 1;             Object e = queue[parent];             if (key.compareTo((E) e) >= 0)                 break;             queue[k] = e;             k = parent;         }         queue[k] = key; }

      你是否發(fā)現(xiàn),compareTo也用上了,就是我們前面描述一大堆的:ScheduledFutureTask類中的一個方法,那么run方法也用上了,這個過程貌似完整了。

      我們再來理一下思路:

      1、調(diào)用的Thread的包裝,由在ThreadPoolExecutor中的Worker調(diào)用你傳入的Runnable的run方法,變成了Worker調(diào)用Runnable的run方法,由它來處理時間片的信息調(diào)用你傳入的線程。

      2、ScheduledFutureTask類在整個過程中提供了基礎參考的方法,其中最為關鍵的就是實現(xiàn)了接口Comparable,實現(xiàn)內(nèi)部的 compareTo方法,也實現(xiàn)了Delayed接口中的getDelay方法用以判定時間(當然Delayed接口本身也是繼承于 Comparable,我們不要糾結(jié)于細節(jié)概念就好)。

      3、等待隊列由在ThreadPoolExecutor中默認使用的LinkedBlockingQueue換成了DelayQueue(它是被 DelayWorkQueue包裝了一下子,沒多大區(qū)別),而DelayQueue主要提供了一個信號量“available”來作為寫入和讀取的信號控 制開關,通過另一個優(yōu)先級隊列“PriorityQueue”來控制實際的隊列順序,他們的順序就是基于上面提到的 ScheduledFutureTask類中的compareTo方法,而是否運行也是基于getDelay方法來實現(xiàn)的。

      4、ScheduledFutureTask類的run方法會判定是否為時間片信息,如果為時間片,在執(zhí)行完對應的方法后,開始計算下一次執(zhí)行時間 (注意判定時間片大于0,小于0,分別代表的是以當前執(zhí)行完的時間為準計算下一次時間還是以當前時間為準),這個在前面有提到。

      5、它是支持多線程的,和Timer的機制最大的區(qū)別就在于多個線程會最征用這個隊列,隊里的排序方式和Timer有很多相似之處,并非完全有序,而是通過位移動來盡量找到合適的位置,有點類似貪心的算法,呵呵。

      上述內(nèi)容就是Java線程池架構(gòu)中的多線程調(diào)度器是什么,你們學到知識或技能了嗎?如果還想學到更多技能或者豐富自己的知識儲備,歡迎關注創(chuàng)新互聯(lián)行業(yè)資訊頻道。


      名稱欄目:Java線程池架構(gòu)中的多線程調(diào)度器是什么
      文章源于:http://ef60e0e.cn/article/pogipo.html
      99热在线精品一区二区三区_国产伦精品一区二区三区女破破_亚洲一区二区三区无码_精品国产欧美日韩另类一区
      1. <ul id="0c1fb"></ul>

        <noscript id="0c1fb"><video id="0c1fb"></video></noscript>
        <noscript id="0c1fb"><listing id="0c1fb"><thead id="0c1fb"></thead></listing></noscript>

        林周县| 宝山区| 麻栗坡县| 雷州市| 卫辉市| 清镇市| 耿马| 麟游县| 马山县| 宽城| 乐陵市| 会宁县| 延吉市| 公主岭市| 阿瓦提县| 香港| 泾源县| 赤峰市| 西乡县| 胶州市| 普陀区| 湛江市| 侯马市| 临桂县| 安陆市| 山东| 金坛市| 自贡市| 南木林县| 娄烦县| 清水县| 喀什市| 蒲城县| 乐至县| 玛纳斯县| 同德县| 平南县| 绥棱县| 奉贤区| 长岛县| 中西区|