Getting Started With POSIX Threads

Tom Wagner
Don Towsley
Department of Computer Science
University of Massachusetts at Amherst

中文翻譯

宋振華
chsong@iim.nctu.edu.tw
國立交通大學 資訊管理研究所
http://www.iim.nctu.edu.tw/DOC/ProgTools/pthread.txt

1/23/1998


1.導論:thread 是什麼?為什麼要用 thread?

Thread 通常被稱做輕量級的行程(Lightweight process;LWP),這個說法似乎過於簡單了一些,但卻不失為一個好的起點; thread 是 UNIX process 的近親,但卻不完全相同。為了說明何謂 thread ,我們必須先瞭解 UNIX process 與 Mach task 、 thread 間的關係。在 UNIX 中,一個 process 包括了一個執行中的程式,和一些他所需的系統資源,諸如檔案描述表和位址空間等。但是在 Mach 中,一個 task 卻只包括了一些系統資源; 而由thread 掌握了所有的執行活動。一個 Mach task 可能有任意多個 thread , 而 Mach 系統中所有的 thread 均屬於一些 task。屬於同一個 task 的所有 thread 共享該 task 所擁有的系統資源。因此, thread 實質上就是一個程式計數器、一個堆疊再加上一組暫存器。 UNIX 的一個 process 可以看成是一個只有一個 thread 的 Mach task。

跟UNIX process 比起來, thread 是非常嬌小玲瓏的,因此對 CPU 而言,產生一個 thread 是一件相對便宜的工作。另一方面,共享系統資源的 thread 跟獨佔系統資源的 process 比起來,thread 也是相當節省記憶體的。 Mach thread 讓程式設計師們能很方便的做出執行於單一或多重處理器環境下同時執行的程式。不需要考慮處理器多寡的問題,而直接得到多重處理的效能(如果有多的處理器的話)。此外即使在單一CPU 的環境下, 如果程式是屬於常常『休息』的那種,如 file 及 socket I/O, thread 還是能提供效能上的增進。

以下將介紹一些簡單的 POSIX thread ,和他在 DEC OSF/1 OS, V3.0.上的版本(譯註:我是在 solaris 2.5.1 /和 SunOS 4.1.4上測試的啦!差不多。),POSIX thread 簡稱為pthread,他和 non-POSIX 的 cthread非常相近。



2.Hello World

廢話少說,現在就開始吧! pthread_create 函數建立一個 thread 。他需要四個參數: thread 變數、 thread 特性、一個描述 thread 行為的函數和這個函數所需的參數。舉例如下:


pthread_t a_thread;
pthread_attr_t a_thread_attribute;
void thread_function(void *argument);
char *some_argument;

pthread_create( &a_thread, a_thread_attribute, (void *)&thread_function,
(void *) &some_argument);

thread attribute 描述 thread 的一些特性,目前我們只需要用他來指定 thread 至少需要多大的堆疊。在未來會有許多有趣的 thread attribute ,但就目前而言,大部分的程式只需簡單的指定 pthread_attr_default 就可以了。不像 process ,需要使用 fork() system call 讓 child process 和他的 parents 同時開始執行, thread從 pthread_create 中指定的 thread_function 開始執行。理由非常簡單:如果 thread 不從一個另外的地方開始執行,將會造成一堆 thread 使用相同的系統資源執行相同的指令。記得嗎? thread 是『共享』系統資源的。(譯註:在這裡停下來,回憶一下 process 是怎麼產生的... ^_^)

在知道如何產生一個 thread 後,就可以開始我們的第一個 thread 程式了!來設計一個 multi-thread 版的 printf("Hello world\n"); 吧!一開始,我們需要兩個 thread 變數,和一個 thread function ,另外,還要能告訴不同的 thread 印出不同的訊息。我想要讓不同的 thread 印出 "hello world" ,不同的兩個部分 "hello" 和"world"。程式看起來像這樣:


void print_message_function( void *ptr );

main()
{
pthread_t thread1, thread2;
char *message1 = "Hello";
char *message2 = "World";

pthread_create( &thread1, pthread_attr_default,
(void*)&print_message_function, (void*) message1);
pthread_create(&thread2, pthread_attr_default,
(void*)&print_message_function, (void*) message2);

exit(0);
}

void print_message_function( void *ptr )
{
char *message;
message = (char *) ptr;
printf("%s ", message);
}


注意 pthread_create 的參數 print_message_function 和他的參數 message1、message2,這支程式用 pthread_create 產生第一個 thread ,並以 "Hello" 作為其起始參數;接下來產生第二個 thread ,指定其起始參數為 "World" 。第一個 thread開始啟動的時候,從 print_message_function 開始執行,其傳入參數為 "Helllo" 。
他將 "Hello" 印出來,然後結束。第二個 thread 則做差不多的事情:印出 "World"。看起來很合理,但是這個程式有兩個主要的缺陷。

第一個缺點,由於兩個 thread 是同時進行的,所以我們無法保證第一個thread 會先執行到 printf 那一行,所以在螢幕上可能會看到 "Hello World" ,也有可能會看到 "World Hello"。另外,在 main(parent thread)裡的 exit 呼叫將結束整個 process ,這將導致所有的 thread 一起結束。所以說,如果 exit 在 printf 前被執行的話,將不會有任何的輸出產生。事實上,在任何一個 thread (不論 parent or child)裡呼叫 exit 都將導致 process 結束,而所有的 thread 也跟著一起結束了。所以如果要結束一個 thread ,我們必須使用 pthread_exit 這個函數。

在我們小程式裡有兩個競爭條件(race condition),一、看看是 parent process先執行到 exit 呢?還是 child process 先執行到 printf ?二、還有兩個child thread 到底是誰會先印出訊息呢?為了讓程式按照我們希望的順序運作,我們嘗試強迫每個 thread 間相互的等待,下面這個程式加入了兩個 sleep 達成這個目的。


void print_message_function( void *ptr );

main()
{
pthread_t thread1, thread2;
char *message1 = "Hello";
char *message2 = "World";

pthread_create( &thread1, pthread_attr_default,
(void *) &print_message_function, (void *) message1);
sleep(10); //休息一下,等"Hello"印出來再產生下一個 thread

pthread_create(&thread2, pthread_attr_default,
(void *) &print_message_function, (void *) message2);
sleep(10); //休息一下,等"World"印出來再結束。

exit(0);
}

void print_message_function( void *ptr )
{
char *message;
message = (char *) ptr;
printf("%s", message);
pthread_exit(0);
}

這個程式達成我們的目的了嗎?不完全是,原因在於使用 timming delay 來達成thread 間的同步是錯誤的,因為 thread 間的緊密耦合(tightly coupled)特性很容易讓我們使用一些不精確的方法來達成其間的同步處理;然而我們卻不該這麼做。在這個程式中我們遇到的競爭條件和分散式應用程式中,資源共享的情況完全相同。共享的資源為標準輸出,而分散計算成原則為程式中的三個 thread。第一個thread必須在第二個thread 前使用 printf/stdout,而兩者皆必須在 parent thread 呼叫exit 前完成他們的工作。

除了使用 delay 來達成同步的效果外,另一個錯誤發生在 sleep 系統呼叫;如同exit 對 process 的影響一樣,當 thread 呼叫 sleep 時,講導致整個 process 停下來。這表示所有屬於這個 process 的 thread 也將跟著停頓下來。因此在上面這個程式中,呼叫 sleep 除了平白讓程式慢了20秒,並不會有什麼額外影響。另外一個適用的函數是 pthread_delay_np (np 表示 not process)。舉例來說,要讓thread 停頓兩秒鐘,可以用下列程式:

struct timespec delay;
delay.tv_sec = 2;
delay.tv_nsec = 0;
pthread_delay_np( &delay );

本節提到的函數有:pthread_create(),
pthread_exit(),
pthread_delay_np().


3.Thread 同步問題

POSIX 提供了兩組用來使 thread 同步的基本指令: mutex 和 condition
variable。mutex 指的是一組用來控制共享資源存取的一組函數。注意,在使用thread
的情況下,因為整個位址空間都是共用的,所以所有的東西都可以視為共享資源。在
一般情況下, thread 使用一些在pthreadcreate 之前定義或在其所呼叫的函數中定
義的變數來完成其工作,並將他的成果經由整體變數合併。對這些大家都可以存取的
變數,我們必須加以控制。

以下是一個 reader/writer 程式,程式中有一個reader,一個writer,他們共享
一個 buffer,且使用 mutex 來控制這個 buffer 的存取。

void reader_function(void);
void writer_function(void);

char buffer;
int buffer_has_item = 0;
pthread_mutex_t mutex;
struct timespec delay;

main()
{
pthread_t reader;

delay.tv_sec = 2;
delay.tv_nsec = 0;

pthread_mutex_init(&mutex, pthread_mutexattr_default);
pthread_create( &reader, pthread_attr_default, (void*)&reader_function,
NULL);
writer_function();
}

void writer_function(void)
{
while(1)
{
pthread_mutex_lock( &mutex );
if ( buffer_has_item == 0 )
{
buffer = make_new_item();
buffer_has_item = 1;
}
pthread_mutex_unlock( &mutex );
pthread_delay_np( &delay );
}
}

void reader_function(void)
{
while(1)
{
pthread_mutex_lock( &mutex );
if ( buffer_has_item == 1)
{
consume_item( buffer );
buffer_has_item = 0;
}
pthread_mutex_unlock( &mutex );
pthread_delay_np( &delay );
}
}

在這個簡單的程式中,我們假設 buffer 的容量只有 1,因此這個 buffer 有兩
個可能的狀態:『有一筆資料』或『沒有資料』。 writer 首先將 mutex 鎖定,如
果 mutex 已經被鎖定,則暫停,直到 mutex 被解鎖。然後看看 buffer 是否是空的,
若buffer 處於『沒有資料』的狀態,writer 產生一筆新的資料,將其放入 buffer
中。然後將旗標 buffer_has_item 設為 1,讓 reader 可藉此旗標得知 buffer 內有
一筆資料。最後 writer 將 mutex 解鎖,並休息 2 秒鐘,讓 reader 可藉此一空檔
取出 buffer 內的資料。這裡使用的 delay跟之前的 delay 有截然不同的意義,如果
不加上這個 delay 的話,writer 在 unlock mutex 後的下一個指令就是為了產生另
一筆新的資料,再度 lock mutex。這將造成 reader 沒有機會讀取 buffer 中的資料
。因此在此處加上一個 delay 看起來是個不錯的主意。

reader 看起來和 writer 差不多,它首先 lock mutex,然後看看buffer 中是否
有資料,若有資料則將其取出,然後將 mutex 解鎖,接著 delay 2 秒,讓 writer 有
機會放入新的資料。在這個例子中,writer 和 reader 就這樣一直的 run 下去,不斷
的產生/移除 buffer 中的資料。在其他的情況下,我們可能不再需要使用 mutex 了,
此時可以使用 pthread_mutex_destroy(&mutex); 來釋放 mutex。

在初始 mutex 的時候,我們使用了 pthread_mutexattr_default 來當作 mutex
特性。在 OSF/1 中,mutex 特性沒啥用處,所以這樣設就夠了。

mutex 一般用在解決 race condition 問題,但是 mutex 並不是一個很強的機制
,因為他只有兩個狀態:locked 和 unlocked。POSIX 定義的條件變數(condition
variable)將 mutex 的功能加以延伸,能夠做到讓某一個 thread 能暫停,並等待另
一個 thread 的信號(signal)。當信號來了,thread 就醒過來,然後將相關的
mutex lock 起來。這樣的作法可以解決 reader/writer 程式中的 spin-lock 問題。
附錄 A 中有一個使用 mutex 和 condition variable 做成的一個簡單的 integer
semaphores。有關 condition variable 的詳細用法可以參考 man page。

本節提到的函數有:pthread_mutex_init(),
pthread_mutex_lock(),
pthread_mutex_unlock(),
pthread_mutex_destroy().


4. 使用 Semaphores 達成協調工作

(本節中用的Semapore 函數怪怪的,一般我不是這樣用。看起來如果要這
樣寫,必須用附錄 中的 library。)

接下來我們想要用 semaphore 來重寫上節之 reader/writer 程式。用更強悍的
整數 semaphore 來取代 mutex ,並解決 spin-lock 問題。與 Semaphore 相關的運
算有 semaphore_up,semaphore_down,semaphore_init, semaphore_destroy, 和
semaphore_decrement. 其中 semaphore_up 和 semaphore_down 和傳統的 semaphore
語法相同 -- down 運算將在 semaphore 之值小於或等於零時暫停。而 up 運算則遞
增 semaphore。 在使用 semaphore 前必須呼叫 init 函數,而所有 semaphore 的
初始值均為 1。當 semaphore 不再被使用時, destroy 函數可以釋放它。上述所有
函數都只需要一個參數:一個指向 semaphore 物件的指標。

Semaphore_decrement 是一個 non-blocking function 他可以將 semaphore 遞
減到一個負值,這個作法有什麼用處呢?一般用於在初始一個 semaphore 時設定它的
初始值。稍後我們會舉出一個例子。接下來首先看 semaphore 版本的 reader/writer
程式。

void reader_function(void);
void writer_function(void);

char buffer;
Semaphore writers_turn;
Semaphore readers_turn;

main()
{
pthread_t reader;

semaphore_init( &readers_turn );
semaphore_init( &writers_turn );

/* writer must go first */
semaphore_down( &readers_turn );

pthread_create( &reader, pthread_attr_default,
(void *)&reader_function, NULL);
writer_function();
}

void writer_function(void)
{
while(1)
{
semaphore_down( &writers_turn );
buffer = make_new_item();
semaphore_up( &readers_turn );
}
}

void reader_function(void)
{
while(1)
{
semaphore_down( &readers_turn );
consume_item( buffer );
semaphore_up( &writers_turn );
}
}

上面這個例子尚未完前展現 integer semaphore 的威力。接下來我們將修改第二
節中的 Hello World 程式,並使用 semaphore 來修正其 race conditions 問題。

void print_message_function( void *ptr );

Semaphore child_counter;
Semaphore worlds_turn;

main()
{
pthread_t thread1, thread2;
char *message1 = "Hello";
char *message2 = "World";

semaphore_init( &child_counter );
semaphore_init( &worlds_turn );

semaphore_down( &worlds_turn ); /* world goes second */

semaphore_decrement( &child_counter ); /* value now 0 */
semaphore_decrement( &child_counter ); /* value now -1 */
/*
* child_counter now must be up-ed 2 times for a thread blocked on it
* to be released
*
*/


pthread_create( &thread1, pthread_attr_default,
(void *) &print_message_function, (void *) message1);

semaphore_down( &worlds_turn );

pthread_create(&thread2, pthread_attr_default,
(void *) &print_message_function, (void *) message2);

semaphore_down( &child_counter );

/* not really necessary to destroy since we are exiting anyway */
semaphore_destroy ( &child_counter );
semaphore_destroy ( &worlds_turn );
exit(0);
}

void print_message_function( void *ptr )
{
char *message;
message = (char *) ptr;
printf("%s ", message);
fflush(stdout);
semaphore_up( &worlds_turn );
semaphore_up( &child_counter );
pthread_exit(0);
}


很容易可以看出,上面這個程式並沒有race condition 問題,而且也會依照正確
的順序印出結果。其中 semaphore child_counter 的目的在於讓 parent thread 暫停
,直到所有的 children 執行 printf 和緊隨其後的 semaphore_up(&child_counter)



本節提到的函數有:semaphore_init(), semaphore_up(),
semaphore_down(), semaphore_destroy(),
semaphore_decrement().

5.使用實務

Compile 使用 pthread 的程式,必須 include 相關的header file(譯註:一般
是 pthread.h)並且連結 pthread library:

cc hello_world.c -o hello_world -lpthreads
(在 Alpha 上你還要加上 -lc_r)
(譯註:在 solaris 上用 -lthread 或 -lpthread 都可以。)

如果要使用 semaphore 則還必須使用相關的 header file 和 library。

DEC 的 pthread 是根據 POSIX IV 的 thread 標準而非 POSIX VIII 發展出來的。
函數 pthread_join 允許一個 thread 等待另一指定的 thread 到該 thread 結束。因
此在 Hello World 程式中,可以用來判斷 children thread 是否結束。但是在 DEC
上,這個函數不太可靠,在下列程式段中,如果指定的 some_thread 不存在,他將會
造成錯誤,而不是直接 return。

pthread_t some_thread;
void *exit_status;
pthread_join( some_thread, &exit_status );

另外一些奇怪的錯誤可能發生在 thread 函數之外的地方,但是卻肇因於此。
在我們的例子中,並不太去檢查 thread 函數是否正確執行,然而這卻是必要的。幾乎
所有的 pthread 函數都在發生錯誤時 return -1。舉例如下:


pthread_t some_thread;
if ( pthread_create( &some_thread, ... ) == -1 )
{
perror("Thread creation error");
exit(1);
}

semaphore library 在發生錯誤的時候會印出一些訊息然後離開。

文中沒有舉出來,但是蠻有用的一些函數如下。

pthread_yield(); 通知 scheduler thread 想要出讓他的執行權力,
不需要參數。

pthread_t me;
me = pthread_self(); 讓 thread 取得他自己的 identifier。

pthread_t thread;
pthread_detach(thread); 通知 library 在後面的pthread_join 呼叫裡,不需
exit status,可增進 thread 的效率。

Appendix A - Semaphore Library Code


==============================================================================
Semaphore.h follows
==============================================================================



/****************************************************************************\
*
* Written by
* Tom Wagner (wagner@cs.umass.edu)
* at the Distributed Problem Solving Lab
* Department of Computer Science, University of Massachusetts,
* Amherst, MA 01003
*
* Copyright (c) 1995 UMASS CS Dept. All rights are reserved.
*
* Development of this code was partially supported by:
* ONR grant N00014-92-J-1450
* NSF contract CDA-8922572
*
* ---------------------------------------------------------------------------
*
* This code is free software; you can redistribute it and/or modify it.
* However, this header must remain intact and unchanged. Additional
* information may be appended after this header. Publications based on
* this code must also include an appropriate reference.
*
* This code is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
* or FITNESS FOR A PARTICULAR PURPOSE.
*
\****************************************************************************/

#ifndef SEMAPHORES
#define SEMAPHORES

#include
#include


typedef struct Semaphore
{
int v;
pthread_mutex_t mutex;
pthread_cond_t cond;
}
Semaphore;


int semaphore_down (Semaphore * s);
int semaphore_decrement (Semaphore * s);
int semaphore_up (Semaphore * s);
void semaphore_destroy (Semaphore * s);
void semaphore_init (Semaphore * s);
int semaphore_value (Semaphore * s);
int tw_pthread_cond_signal (pthread_cond_t * c);
int tw_pthread_cond_wait (pthread_cond_t * c, pthread_mutex_t * m);
int tw_pthread_mutex_unlock (pthread_mutex_t * m);
int tw_pthread_mutex_lock (pthread_mutex_t * m);
void do_error (char *msg);

#endif




==============================================================================
Semaphore.c follows
==============================================================================




/****************************************************************************\
*
* Written by
* Tom Wagner (wagner@cs.umass.edu)
* at the Distributed Problem Solving Lab
* Department of Computer Science, University of Massachusetts,
* Amherst, MA 01003
*
* Copyright (c) 1995 UMASS CS Dept. All rights are reserved.
*
* Development of this code was partially supported by:
* ONR grant N00014-92-J-1450
* NSF contract CDA-8922572
*
* ---------------------------------------------------------------------------
*
* This code is free software; you can redistribute it and/or modify it.
* However, this header must remain intact and unchanged. Additional
* information may be appended after this header. Publications based on
* this code must also include an appropriate reference.
*
* This code is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
* or FITNESS FOR A PARTICULAR PURPOSE.
*
\****************************************************************************/

#include "semaphore.h"


/*
* function must be called prior to semaphore use.
*
*/
void
semaphore_init (Semaphore * s)
{
s->v = 1;
if (pthread_mutex_init (&(s->mutex), pthread_mutexattr_default) == -1)
do_error ("Error setting up semaphore mutex");

if (pthread_cond_init (&(s->cond), pthread_condattr_default) == -1)
do_error ("Error setting up semaphore condition signal");
}

/*
* function should be called when there is no longer a need for
* the semaphore.
*
*/
void
semaphore_destroy (Semaphore * s)
{
if (pthread_mutex_destroy (&(s->mutex)) == -1)
do_error ("Error destroying semaphore mutex");

if (pthread_cond_destroy (&(s->cond)) == -1)
do_error ("Error destroying semaphore condition signal");
}

/*
* function increments the semaphore and signals any threads that
* are blocked waiting a change in the semaphore.
*
*/
int
semaphore_up (Semaphore * s)
{
int value_after_op;

tw_pthread_mutex_lock (&(s->mutex));

(s->v)++;
value_after_op = s->v;

tw_pthread_mutex_unlock (&(s->mutex));
tw_pthread_cond_signal (&(s->cond));

return (value_after_op);
}

/*
* function decrements the semaphore and blocks if the semaphore is
* <= 0 until another thread signals a change.
*
*/
int
semaphore_down (Semaphore * s)
{
int value_after_op;

tw_pthread_mutex_lock (&(s->mutex));
while (s->v <= 0)
{
tw_pthread_cond_wait (&(s->cond), &(s->mutex));
}

(s->v)--;
value_after_op = s->v;

tw_pthread_mutex_unlock (&(s->mutex));

return (value_after_op);
}

/*
* function does NOT block but simply decrements the semaphore.
* should not be used instead of down -- only for programs where
* multiple threads must up on a semaphore before another thread
* can go down, i.e., allows programmer to set the semaphore to
* a negative value prior to using it for synchronization.
*
*/
int
semaphore_decrement (Semaphore * s)
{
int value_after_op;

tw_pthread_mutex_lock (&(s->mutex));
s->v--;
value_after_op = s->v;
tw_pthread_mutex_unlock (&(s->mutex));

return (value_after_op);
}

/*
* function returns the value of the semaphore at the time the
* critical section is accessed. obviously the value is not guarenteed
* after the function unlocks the critical section. provided only
* for casual debugging, a better approach is for the programmar to
* protect one semaphore with another and then check its value.
* an alternative is to simply record the value returned by semaphore_up
* or semaphore_down.
*
*/
int
semaphore_value (Semaphore * s)
{
/* not for sync */
int value_after_op;

tw_pthread_mutex_lock (&(s->mutex));
value_after_op = s->v;
tw_pthread_mutex_unlock (&(s->mutex));

return (value_after_op);
}



/* -------------------------------------------------------------------- */
/* The following functions replace standard library functions in that */
/* they exit on any error returned from the system calls. Saves us */
/* from having to check each and every call above. */
/* -------------------------------------------------------------------- */


int
tw_pthread_mutex_unlock (pthread_mutex_t * m)
{
int return_value;

if ((return_value = pthread_mutex_unlock (m)) == -1)
do_error ("pthread_mutex_unlock");

return (return_value);
}

int
tw_pthread_mutex_lock (pthread_mutex_t * m)
{
int return_value;

if ((return_value = pthread_mutex_lock (m)) == -1)
do_error ("pthread_mutex_lock");

return (return_value);
}

int
tw_pthread_cond_wait (pthread_cond_t * c, pthread_mutex_t * m)
{
int return_value;

if ((return_value = pthread_cond_wait (c, m)) == -1)
do_error ("pthread_cond_wait");

return (return_value);
}

int
tw_pthread_cond_signal (pthread_cond_t * c)
{
int return_value;

if ((return_value = pthread_cond_signal (c)) == -1)
do_error ("pthread_cond_signal");

return (return_value);
}


/*
* function just prints an error message and exits
*
*/
void
do_error (char *msg)
{
perror (msg);
exit (1);
}
--
創作者介紹

邱小新の工作筆記

台南小新 發表在 痞客邦 PIXNET 留言(0) 人氣()