[线程]glib库线程池代码剖析续ITeyedongfang - 超凡娱乐

[线程]glib库线程池代码剖析续ITeyedongfang

2019-02-02 08:53:16 | 作者: 秋荷 | 标签: 线程,代码,大局 | 浏览: 1676

转载自:

 

 

   Glib-2.12.9中的线程池能够把它看作一个类,进程能够经过这个类创立多个线程池目标。每个线程池目标所创立的线程能够以私有方法运用,也能够以同享方法运用。私有方法是指详细某个线程池目标所创立办理的线程只能是它自己运用,同一进程的其它线程池目标则无权过问。同享方法,望文生义就是指同一进程的一切线程池目标创立的线程以同享的方法运用,这也正是本文要剖析的。关于私有方法的代码剖析和下面要用到的异步行列常识能够在我写的另一篇文章中检查。
       线程池目标同享线程的内部原理并不杂乱,但完结却不是那么简略。特别是同享方法的完结代码要同私有方法和相关的开释办理代码交错在一起更是杂乱。当然咱们只剖析同享方法的相关代码,其它不相关的代码暂不考虑,这样就简略多了。那么先看看它的原理:在同一进程空间的一切线程同享了地址空间,在同享的地址空间中有一个供一切线程池目标同享的大局线程池;当某个详细的线程池目标创立了线程完结使命后,并又不再运用这一刚创立线程时,这一刚创立的线程就交给大局线程池办理;当其它线程池目标想要线程做某一使命时,它就从大局线程池获取线程(没有就自己创立)完结相应的使命,使命完结后又把不再运用的线程放回到大局线程池。
       线程池目标同享方法从某一方面看,它的完结使用了线程的实质。传统的创立线程是线程总和某一函数相关在一起,像是鱼离不开水相同。而Glib完结的线程池同享方法就不相同了,线程池目标创立的线程就是线程,它只要同一进程的同享地址空间,而不跟任何函数耦合。因而无论是哪个线程池目标创立的线程,在大局线程池看来都是相同的:就是个线程,即线程和函数彻底地解耦。这也是我以为Glib库完结线程池中最为精妙之处。
在gthreadpool.h头文件中有如下的数据结构:
39    /* The real GThreadPool is bigger, so you may only create a thread
40    * pool with the constructor function */
41    struct _GThreadPool
42    {
43     GFunc func;
44     gpointer user_data;
45     gboolean exclusive; // 为TRUE表明不同享方法,FALSE表明同享方法
46    };
 
接下来的呈现的一切代码都是来自gthreadpool.c文件的:
35    typedef struct _GRealThreadPool GRealThreadPool;
36
37    struct _GRealThreadPool
38    {
39     GThreadPool pool;
// 使命异步行列,经过g_thread_pool_push接口把数据放入这行列。一个数据就
// 是一个要被线程池目标处理的使命。
40     GAsyncQueue* queue;
……………………..// 点代表省掉的代码
49    };
50  
51    /* The following is just an address to mark the wakeup order for a
52    * thread, it could be any address (as long, as it isnt a valid
53    * GThreadPool address) */
54    static gconstpointer const wakeup_thread_marker = (gconstpointer) g_thread_pool_new;
55    static gint wakeup_thread_serial = 0;
56  
57    /* Here all unused threads are waiting */
58    static GAsyncQueue *unused_thread_queue = NULL; // 用于大局线程池
59    static gint unused_threads = 0; // 大局线程池中所持有线程的个数
60    static gint max_unused_threads = 0; // 大局线程池持有线程的上限
 
线程池目标的创立:
416 GThreadPool*
417 g_thread_pool_new (GFunc            func,
418                   gpointer         user_data,
419                   gint             max_threads,
420                   gboolean         exclusive,
421                   GError         **error)
422 {
423  GRealThreadPool *retval;
…………………
431  retval = g_new (GRealThreadPool, 1);
432
433  retval- pool.func = func;
434  retval- pool.user_data = user_data;
435  retval- pool.exclusive = exclusive;
436  retval- queue = g_async_queue_new ();
…………………
// 办理大局线程池的unused_thread_queue异步行列只被创立一次。这儿运用类
// 似pthread_once之类的来操控是否更好???
444  G_LOCK (init);
// 创立用于大局线程池的异步行列unused_thread_queue
445  if (!unused_thread_queue)
446       unused_thread_queue = g_async_queue_new ();
447  G_UNLOCK (init);
…………………
467  return (GThreadPool*) retval;
468 }
 
结合下面的代码(代码没有按代码行而是按调用先后顺序给出)剖析大局线程池办理线程进程:“存储”线程进程和“分配”线程进程。
办理大局线程池的异步行列unused_thread_queue的解说:
       “存储”线程到大局线程池是代码128行:…_pop (unused_thread_queue),从大局线程池分配线程是代码356行:…_push_unlocked (unused_thread_queue, pool)。假如有多个线程在大局线程池,那么就有多个线程调用128行的pop接口进入睡觉,也即有多个线程在等候恣意一个线程池目标pool的到来并为之效劳。当某个详细的线程池目标有使命(数据)要处理,就调用356行的push接口把要处理使命的线程池目标pool交给了在等候pool的恣意的一个线程,实质上也就是pool获取了大局线程池中恣意一个可用的线程。
大局线程池“存储”线程进程:
       1. 假定大局线程池现在还没有线程,主线程经过某个线程池目标pool履行了代码489行的g_thread_pool_push接口。
2. 由于是同享方法,这时它肯定会履行到代码行504行的g_thread_pool_start_thread
3. 343行的g_thread_pool_start_thread函数因大局线程池没有可用线程,主线程会履行到代码366行调用 g_thread_create创立线程。主线程创立了新线程后持续履行,经过调用层次的回来履行到代码行506把使命(数据)放进线程池目标pool。新的被创立的线程从239行的函数g_thread_pool_thread_proxy开端持续履行。这时新线程中的线程池目标pool和主线程履行到代码506行那个pool是同一个。接下来看新线程是怎么“存储”到大局线程池。
4. 新线程履行到254行获取使命,假如pool还没有使命(主线程还没有把使命放入pool),新线程最会等候1/2秒,这时刻满足让主线程把使命放入pool。因而新线程履行到254行获取就能得到一个待处理的使命,并在265行代码处理使命。
5. 新线程处理完使命后就进入了下一次循环。当它又履行代码254行获取使命时,这时因pool中没有使命了,函数 g_thread_pool_wait_for_new_task就回来一个空的使命。然后就会履行到324行调用函数 g_thread_pool_wait_for_new_pool。
6. 代码88行的函数g_thread_pool_wait_for_new_pool会履行到代码128行调用函数g_async_queue_pop接口。到此被创立的新线程就“存储”到了大局线程池。从代码324行可看出,线程所运用的pool已不再局限于主线程刚创立新线程时所给的pool了,也就是到达了一切的pool都能够同享此新线程。
大局线程池“分配”线程进程:
1. 同“存储”线程进程的前两步相同履行到343行的g_thread_pool_start_thread函数。
2. 由于大局线程池已有可用的闲暇线程,所以函数g_thread_pool_start_thread会履行到代码行356调用函数 g_async_queue_push_unlocked把当时的线程池目标pool给异步行列unused_thread_queue,也就是经过入队的pool唤醒一个等候线程(可用的闲暇线程)。
3. 主线程唤醒了一个等候线程后持续履行,经过调用层次的回来履行到代码行506把使命(数据)放进当时线程池目标pool。由“存储”线程进程的第6步可知,线程的等候是在代码128行调用函数g_async_queue_pop,因而被唤醒的线程由这儿开端履行。而…_pop接口回来的pool就是主线程调用…_push放入的有使命要处理的pool。
4. 被唤醒的线程履行到代码324行,然后又是从代码250行的循环开端履行。
5. 接下来的履行进程跟“存储”线程进程后三个过程是相同的。
 
489 g_thread_pool_push (GThreadPool *pool,
490                    gpointer      data,
491                    GError      **error)
492 {
493  GRealThreadPool *real;
494
495  real = (GRealThreadPool*) pool;
………………………
500  g_async_queue_lock (real- queue);
501
// “ = 0”表明当时线程池目标没具有闲暇的线程。为什么“ = 0”就是这种状况
// ,这是异步行列的常识,能够参阅上面供给的链接。
// 其实假如是同享方法运用当时线程池目标的话,当时线程池目标就不该具有闲暇线程,
// 由于闲暇线程都交给大局线程池了。
502  if (g_async_queue_length_unlocked (real- queue) = 0)
503     /* No thread is waiting in the queue */
504     g_thread_pool_start_thread (real, error);
505
506  g_thread_pool_queue_push_unlocked (real, data);
507  g_async_queue_unlock (real- queue);
508 }
 
343 g_thread_pool_start_thread (GRealThreadPool *pool,
344                           GError          **error)
345 {
346  gboolean success = FALSE;
347 
// 判别当时线程池具有的线程是否现已到达它的上限,假如是就回来。
348  if (pool- num_threads = pool- max_threads pool- max_threads != -1)
349     /* Enough threads are already running */
350     return;
351
352  g_async_queue_lock (unused_thread_queue);
353
// 判别大局线程池是否有可用的闲暇线程,假如有
// 就调用g_async_queue_push_unlocked (unused_thread_queue, pool)让
// 当时线程池目标获取一个闲暇的线程
354  if (g_async_queue_length_unlocked (unused_thread_queue) 0)
355  {
356       g_async_queue_push_unlocked (unused_thread_queue, pool);
357       success = TRUE;
358  }
359
360  g_async_queue_unlock (unused_thread_queue);
361
// 假如大局线程池没有可用线程,success仍是为FALSE,
// 则调用g_thread_create新建一个线程
362  if (!success)
363  {
364       GError *local_error = NULL;
365       /* No thread was found, we have to start a new one */
366       g_thread_create (g_thread_pool_thread_proxy, pool, FALSE, local_error);
………………………
373  }
374
375  /* See comment in g_thread_pool_thread_proxy as to why this is done
376    * here and not there
377    */
378  pool- num_threads++;
379 }
 
239 g_thread_pool_thread_proxy (gpointer data)
240 {
241  GRealThreadPool *pool;
242
243  pool = data;
………………………
248  g_async_queue_lock (pool- queue);
249
250  while (TRUE)
251  {
252       gpointer task;
253
// 假如task不为空,当时线程就用线程池目标的使命处理函数pool- pool.func处理任
// 务,处理完后持续循环。
// 假如task为空并且是同享方法,就调用g_thread_pool_wait_for_new_pool把当时
// 线程交给了大局线程池。
254       task = g_thread_pool_wait_for_new_task (pool);
255       if (task)
256          {
257            if (pool- running || !pool- immediate)
258             {
259               /* A task was received and the thread pool is active, so
260                * execute the function.
261                */
262               g_async_queue_unlock (pool- queue);
263               DEBUG_MSG (("thread %p in pool %p calling func.",
264                        g_thread_self (), pool));
// 线程依据线程池目标pool处理使命
265               pool- pool.func (task, pool- pool.user_data);
266              g_async_queue_lock (pool- queue);
267             }
268          }
269       else
270          {
271            /* No task was received, so this thread goes to the global
272              * pool.
273              */
…………………….
324            if ((pool = g_thread_pool_wait_for_new_pool ()) NULL)
325               break;
…………………….
336          }
337  }
338
339  return NULL;
340 }
 
 
180 static gpointer
181 g_thread_pool_wait_for_new_task (GRealThreadPool *pool)
182 {
183  gpointer task = NULL;
184
185  if (pool- running || (!pool- immediate
186                       g_async_queue_length_unlocked (pool- queue) 0))
187  {
188       /* This thread pool is still active. */
189       if (pool- num_threads pool- max_threads pool- max_threads != -1)
190          {
191            /* This is a superfluous thread, so it goes to the global pool. */
192            DEBUG_MSG (("superfluous thread %p in pool %p.",
193                        g_thread_self (), pool));
194          }
195       else if (pool- pool.exclusive) // 不同享方法
196          {
……………………..
204          }
205       else // 同享方法,等候1/2秒后回来空的task
206          {
207            /* A thread will wait for new tasks for at most 1/2
208              * second before going to the global pool.
209              */
210            GTimeVal end_time;
211
212            g_get_current_time ( end_time);
213            g_time_val_add ( end_time, G_USEC_PER_SEC / 2);       /* 1/2 second */
214
215            DEBUG_MSG (("thread %p in pool %p waits for up to a 1/2 second for task "
216                        "(%d running, %d unprocessed).",
217                        g_thread_self (), pool, pool- num_threads,
218                        g_async_queue_length_unlocked (pool- queue)));
219
220            task = g_async_queue_timed_pop_unlocked (pool- queue, end_time);
221          }
222  }
223  else
224     {
225       /* This thread pool is inactive, it will no longer process tasks. */
……………….
232     }
233
234  return task;
235 }
 
 
// 从“存储”当时线程到大局线程池看,函数的功用是把当时线程交给大局线程池。
// 从线程池目标获取线程看,函数的功用是从大局线程池“分配”线程,即被回来的
// pool获得了一个线程。
87 static GRealThreadPool*
 88 g_thread_pool_wait_for_new_pool (void)
 89 {
 90  GRealThreadPool *pool;
………………….
// 大局线程池中具有的线程个数加一,回为当时线程要进入到大局线程池。
101  g_atomic_int_inc ( unused_threads);
102
103  do
104  {
105       if (g_atomic_int_get ( unused_threads) = local_max_unused_threads)
106          {
107            /* If this is a superfluous thread, stop it. */
108            pool = NULL;
109          }
110       else if (local_max_idle_time 0)
111          {
112            /* If a maximal idle time is given, wait for the given time. */
113            GTimeVal end_time;
……………………
// 使用异步行列的pop接口把当时线程交给大局线程池,使它自己进入等候状况
121            pool = g_async_queue_timed_pop (unused_thread_queue, end_time);
122          }
123       else
124          {
125            /* If no maximal idle time is given, wait indefinitely. */
………………………
// 使用异步行列的pop接口把当时线程交给大局线程池,使它自己进入等候状况
128            pool = g_async_queue_pop (unused_thread_queue);
129          }
………………………
172  }
173  while (pool wakeup_thread_marker);
174
// 大局线程池中具有的线程个数减一,回为当时线程离开了大局线程池,
// pool获取了当时线程。
175  g_atomic_int_add ( unused_threads, -1);
176
177  return pool;
178 }
 

版权声明
本文来源于网络,版权归原作者所有,其内容与观点不代表超凡娱乐立场。转载文章仅为传播更有价值的信息,如采编人员采编有误或者版权原因,请与我们联系,我们核实后立即修改或删除。

猜您喜欢的文章