Pthread:共享内存系统并行编程模型

image-20230920190726082

所有处理器能看到同样的内存地址空间

因为访问冲突会加重,一般最多32个核

线程:轻量级的进程,多个线程可以共享进程中的空间

基于c和c++,基于类unix系统(linux,windows)

每个线程都有一个在进程中唯一的线程标识符,用一个数据类型 pthread_t 表示,该数据类型在 Linux 中就是一个无符号长整型数据。

常用函数

image-20230918135446436

线程的创建:pthread_create

pthread_create:创建一个线程,在其中执行对应的函数start_routine。线程需要提前利用pthread_t threads[NUM_THREADS];定义

1
int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine)(void *), void *arg);

功能说明:创建一个线程。

参数说明:

  • thread 是线程标识符,但这个参数不是由用户指定的,而是由 pthread_create 函数在创建时将新的线程的标识符放到这个变量中。
    • 需要提前利用pthread_t threads[NUM_THREADS];定义线程
  • attr 指定线程的属性,可以用 NULL 表示默认属性。
  • start_routine 是一个函数指针,指定在线程中运行的函数。
  • argstart_routine 所需要的参数,是一个无类型指针

默认地,线程在被创建时要被赋予一定的属性,这个属性存放在数据类型 pthread_attr_t 中,它包含了线程的调度策略,堆栈的相关信息,join or detach 的状态等。

pthread_attr_initpthread_attr_destroy 函数分别用来创建和销毁 pthread_attr_t,具体函数声明可参考man帮助。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>

#define NUM_THREADS 8

char *messages[NUM_THREADS];
//标注了线程相应的信息
struct thread_data
{
int thread_id;
int sum;
char *message;
} thread_data_array[NUM_THREADS] ;

void *PrintHello(void *threadarg)
{
int taskid, sum;
char *hello_msg;
struct thread_data *my_data;

sleep(1);
my_data = (struct thread_data *) threadarg;
taskid = my_data->thread_id;
sum = my_data->sum;
hello_msg = my_data->message;
printf("Thread %d: %s Sum=%d\n", taskid, hello_msg, sum);
pthread_exit(NULL);
}

int main(int argc, char *argv[])
{
//定义所需要的线程
pthread_t threads[NUM_THREADS];
int *taskids[NUM_THREADS];
int rc, t, sum;

sum = 0;
messages[0] = "English: Hello World!";
messages[1] = "French: Bonjour, le monde!";
messages[2] = "Spanish: Hola al mundo";
messages[3] = "Klingon: Nuq neH!";
messages[4] = "German: Guten Tag, Welt!";
messages[5] = "Russian: Zdravstvytye, mir!";
messages[6] = "Japan: Sekai e konnichiwa!";
messages[7] = "Latin: Orbis, te saluto!";

for (t = 0; t < NUM_THREADS; t++)
{
sum = sum + t;
thread_data_array[t].thread_id = t;
thread_data_array[t].sum = sum;
thread_data_array[t].message = messages[t];
printf("Creating thread %d\n", t);

//创建线程,执行函数,如果成功,返回0
rc = pthread_create(&threads[t], NULL, PrintHello, (void *)&thread_data_array[t]);

if (rc)
{
printf("ERROR; return code from pthread_create() is %d\n", rc);
exit(-1);
}
}
pthread_exit(NULL);
}

对线程的阻塞:pthread_join

pthread_join:函数原型:

1
2
3
int pthread_join(pthread_t thread, void **status);
//thread:代表要等待线程的线程 ID
//status:获取该线程的退出状态

pthread_join 作用是等待一个指定的线程结束执行,然后才继续执行当前线程。函数会让调用它的线程等待 threadid 线程运行结束之后再运行,可以对每个线程使用一次。

  1. 等待线程结束:当一个线程调用 pthread_join(thread, status) 时,它会被阻塞,直到线程 thread 完成其执行。只有当线程 thread 终止后,pthread_join 才会返回。
  2. 获取线程的退出状态pthread_join 的第二个参数 status 是一个指向整数的指针,用于存储线程的退出状态。当线程 thread 结束时,它的退出状态会被写入到 status 指向的地址中,以便父线程可以查看线程的返回值或错误状态。

join的含义

在并行计算中,“join” 通常表示等待其他线程完成其工作,以便能够收集和整合它们的结果或执行后续操作。这在多线程或多进程的并行计算中非常常见,其中一个主线程(或进程)会创建和管理多个子线程(或子进程),并使用 pthread_join(或其他等待机制)等待它们完成任务

例如,假设您有一个并行计算任务,将一个大数组分成多个部分,每个部分由一个线程处理。主线程可以创建这些线程,然后使用 pthread_join 等待它们完成处理。一旦所有线程都完成了它们的工作,主线程可以汇总它们的结果或执行其他操作。

有可能主线程完成了,但副线程还没完成,使用join进行等待。而且不只有 pthread_join 中的线程在运行,而是主调用线程(调用 pthread_join 的线程)会被阻塞,等待被指定的线程(作为参数传递给 pthread_join 的线程)运行完成后才会继续执行。被指定的线程仍然可以在后台执行,而主调用线程会被阻塞,直到指定线程完成其任务。

进程之间的交互

在多线程计算时,经常会有多个线程访问一些共享资源,因为彼此读取的不同步造成计算结果有无。

在多进程工作时,因为不同进程的处理时间有区别,对存储器的取值/赋值不一定是连续的,可能会丢失值

在计算π\pi时,如果使用不同进程计算不同部分,在总进程数增加时,精度越来越低。一个示例:sum+=i

image-20230918135708774

注意:把调节读存的控制全局变量的影响调节到最小。有时候无法预估线程的时序先后,取决于OS的调度

互斥锁 Mutex

Mutex 常常被用来保护那些可以被多个线程访问的共享资源,比如可以防止多个线程同时更新同一个数据时出现混乱。

使用互斥锁的一般步骤是:

  • 创建一个互斥锁,即声明一个pthread_mutex_t类型的数据,然后初始化,只有初始化之后才能使用;
  • 多个线程尝试锁定这个互斥锁;
  • 锁定互斥锁的线程才能够运行,否则不能运行
    • 只有一个成功锁定互斥锁,成为互斥锁的拥有者,然后进行一些指令;
    • 拥有者解锁互斥锁
    • 其他线程尝试锁定这个互斥锁,重复上面的过程;
  • 最后互斥锁被显式地调用 pthread_mutex_destroy 来进行销毁。

有两种方式初始化一个互斥锁:

第一种,利用已经定义的常量初始化,例如

1
pthread_mutex_t mymutex = PTHREAD_MUTEX_INITIALIZER;

第二种方式是调用 pthread_mutex_init (mutex,attr) 进行初始化。

当多个线程同时去锁定同一个互斥锁时,对于失败的那些线程:

  • 如果是用 pthread_mutex_lock 函数,那么会被阻塞,直到这个互斥锁被解锁,它们再继续竞争;
  • 如果是用 pthread_mutex_trylock 函数,那么失败者只会返回一个错误。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#include <pthread.h>

// 定义共享资源和互斥锁
int shared_resource = 0;
pthread_mutex_t mutex;

void* thread_function(void* arg)
{
// 加锁
pthread_mutex_lock(&mutex);
// 访问共享资源
shared_resource++;
// 解锁
pthread_mutex_unlock(&mutex);

return NULL;
}

int main()
{
// 初始化互斥锁
pthread_mutex_init(&mutex, NULL);

// 创建多个线程来访问共享资源
pthread_t thread1, thread2;
pthread_create(&thread1, NULL, thread_function, NULL);
pthread_create(&thread2, NULL, thread_function, NULL);

// 等待线程结束
pthread_join(thread1, NULL);
pthread_join(thread2, NULL);

// 销毁互斥锁
pthread_mutex_destroy(&mutex);

return 0;
}

常用函数

1
2
3
4
5
6
7
pthread_mutex_init (mutex,attr);
pthread_mutex_destroy (pthread_mutex_t *mutex);
pthread_mutexattr_init (attr);
pthread_mutexattr_destroy (attr);
phtread_mutex_lock(pthread_mutex_t *mutex);
phtread_mutex_trylock(pthread_mutex_t *mutex);
phtread_mutex_unlock(pthread_mutex_t *mutex);
  1. pthread_mutex_init(mutex, attr);
    • 作用:用于初始化一个互斥锁。
    • 参数:mutex:要初始化的互斥锁对象的指针。
      • attr:互斥锁的属性,通常可以传递 NULL 来使用默认属性。
    • 示例pthread_mutex_init(&mutex, NULL);
  2. pthread_mutex_destroy(pthread_mutex_t *mutex);
    • 作用:用于销毁一个互斥锁,释放与之关联的资源。
    • 参数:mutex:要销毁的互斥锁对象的指针。
    • 示例pthread_mutex_destroy(&mutex);
  3. pthread_mutexattr_init(pthread_mutexattr_t *attr);
    • 作用:用于初始化一个互斥锁属性对象,以便设置互斥锁的属性。
    • 参数:attr:要初始化的互斥锁属性对象的指针。
    • 示例pthread_mutexattr_init(&attr);
  4. pthread_mutexattr_destroy(pthread_mutexattr_t *attr);
    • 作用:用于销毁一个互斥锁属性对象,释放与之关联的资源。
    • 参数:attr:要销毁的互斥锁属性对象的指针。
    • 示例pthread_mutexattr_destroy(&attr);
  5. pthread_mutex_lock(pthread_mutex_t *mutex);
    • 作用:用于锁定(获取)一个互斥锁,阻止其他线程进入临界区直到锁被释放。
    • 参数:mutex:要锁定的互斥锁对象的指针。
    • 示例pthread_mutex_lock(&mutex);
  6. pthread_mutex_trylock(pthread_mutex_t *mutex);
    • 作用:尝试锁定一个互斥锁,但如果锁已经被其他线程占用,则不会阻塞当前线程,而是立即返回一个错误代码。
    • 参数:mutex:要尝试锁定的互斥锁对象的指针。
    • 返回值:成功锁定返回0,否则返回错误代码。
    • 示例int result = pthread_mutex_trylock(&mutex);
  7. pthread_mutex_unlock(pthread_mutex_t *mutex);
    • 作用:用于解锁(释放)一个互斥锁,允许其他线程进入临界区。
    • 参数:mutex:要解锁的互斥锁对象的指针。
    • 示例pthread_mutex_unlock(&mutex);

条件变量 Condition Variable

互斥锁只有两种状态,这限制了它的用途。条件变量允许线程在阻塞的时候等待另一个线程发送的信号,当收到信号后,阻塞的线程就被唤醒并试图锁定与之相关的互斥锁。条件变量要和互斥锁结合使用

  1. 一个线程首先获得互斥锁,获得对共享资源的访问权,同时对mutex上锁
  2. 在上锁后,由于默写条件,使用条件变量,让此线程沉睡,暂时释放互斥锁,由别的线程获得对共享资源的访问权
  3. 在到达条件变量的条件时,解锁互斥锁,发送信号到沉睡线程处,重新获得互斥锁,此时别的线程没有互斥锁
  4. 原先沉睡的线程继续运行

条件变量的声明和初始化

通过声明 pthread_cond_t 类型的数据,并且必须先初始化才能使用。

初始化的方法也有两种:

第一种,利用内部定义的常量,例如:

1
pthread_cond_t myconvar = PTHREAD_COND_INITIALIZER;

第二种,利用函数 pthread_cond_init(cond,attr),其中 attrpthread_condattr_init()pthread_condattr_destroy() 创建和销毁。

可以用 pthread_cond_destroy() 销毁一个条件变量。

相关函数:

1
2
3
pthread_cond_wait (condition,mutex);
pthread_cond_signal (condition);
pthread_cond_broadcast (condition);

pthread_cond_wait() 会阻塞调用它的线程,直到收到某一信号。这个函数需要在 mutex 已经被锁之后进行调用,并且当线程被阻塞时,会自动解锁 mutex。信号收到后,线程被唤醒,这时 mutex 又会被这个线程锁定。

pthread_cond_signal() 函数结束时,必须解锁 mutex,以供 pthread_cond_wait() 锁定mutex。

当不止一个线程在等待信号时,要用 pthread_cond_broadcast()代替 pthread_cond_signal() 来告诉所有被该条件变量阻塞的线程结束阻塞状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>

#define NUM_THREADS 3
#define TCOUNT 10
#define COUNT_LIMIT 12

int count = 0;
pthread_mutex_t count_mutex;
pthread_cond_t count_threshold_cv;

void *inc_count(void *t)
{
int i;
long my_id = (long)t;

for (i = 0; i < TCOUNT; i++)
{
pthread_mutex_lock(&count_mutex);
count++;

/*
Check the value of count and signal waiting thread when condition is reached.
Note that this occurs while mutex is locked.
*/
if (count == COUNT_LIMIT)
{
printf("inc_count(): thread %ld, count = %d Threshold reached. ", my_id, count);
//到达阈值,发出信号,解锁mutex锁定的线程
pthread_cond_signal(&count_threshold_cv);
printf("Just sent signal.\n");
}
printf("inc_count(): thread %ld, count = %d, unlocking mutex\n", my_id, count);
pthread_mutex_unlock(&count_mutex);

// Do some work so threads can alternate on mutex lock
sleep(1);
}
pthread_exit(NULL);
}

void *watch_count(void *t)
{
long my_id = (long)t;

printf("Starting watch_count(): thread %ld\n", my_id);

/*
Lock mutex and wait for signal.
Note that the pthread_cond_wait routine will automatically and atomically unlock mutex while it waits.
Also, note that if COUNT_LIMIT is reached before this routine is run by the waiting thread, the loop will be skipped to prevent pthread_cond_wait from never returning.
*/
//加锁,访问共享的count资源
pthread_mutex_lock(&count_mutex);
while (count < COUNT_LIMIT)
{
printf("watch_count(): thread %ld Count= %d. Going into wait...\n", my_id, count);
//调用 pthread_cond_wait 函数,将自己挂起,等待条件变量 count_threshold_cv 的信号。
//当其他线程通过 pthread_cond_signal 发出信号时,该线程将被唤醒,然后它会重新检查 count 的值,并在此基础上做一些操作
pthread_cond_wait(&count_threshold_cv, &count_mutex);
printf("watch_count(): thread %ld Condition signal received. Count= %d\n", my_id, count);
printf("watch_count(): thread %ld Updating the value of count...\n", my_id, count);
count += 125;
printf("watch_count(): thread %ld count now = %d.\n", my_id, count);
}
printf("watch_count(): thread %ld Unlocking mutex.\n", my_id);
//解锁互斥锁,释放对 count 的访问
pthread_mutex_unlock(&count_mutex);
pthread_exit(NULL);
}

int main(int argc, char *argv[])
{
int i, rc;
long t1 = 1, t2 = 2, t3 = 3;
pthread_t threads[3];
pthread_attr_t attr;

// Initialize mutex and condition variable objects
//初始化mutex
pthread_mutex_init(&count_mutex, NULL);
//初始化条件变量
pthread_cond_init(&count_threshold_cv, NULL);

//For portability, explicitly create threads in a joinable state
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
pthread_create(&threads[0], &attr, watch_count, (void *)t1);
pthread_create(&threads[1], &attr, inc_count, (void *)t2);
pthread_create(&threads[2], &attr, inc_count, (void *)t3);

/* Wait for all threads to complete */
for (i=0;i<NUM_THREADS;i++)
pthread_join(threads[i], NULL);
printf("Main(): Waited and joined with %d threads. Final value of count = %d. Done.\n", NUM_THREADS, count);

/* Clean up and exit */
pthread_attr_destroy(&attr);
pthread_mutex_destroy(&count_mutex);
pthread_cond_destroy(&count_threshold_cv);
pthread_exit(NULL);
}

线程的性质:结合与分离

在任何一个时间点上,线程是可结合的(joinable),或者是分离的(detached)。

  • 一个可结合的线程能够被其他线程收回其资源和杀死;在被其他线程回收之前,它的存储器资源(如栈)是不释放的。

  • 相反,一个分离的线程是不能被其他线程回收或杀死的,它的存储器资源在它终止时由系统自动释放。

线程的分离状态决定一个线程以什么样的方式来终止自己

  • 默认情况下线程是非分离状态的,这种情况下,原有的线程等待创建的线程结束。只有当pthread_join()函数返回时,创建的线程才算终止,才能释放自己占用的系统资源。

  • 而分离线程不是这样子的,它没有被其他的线程所等待,自己运行结束了,线程也就终止了,马上释放系统资源。

    • 如果在创建线程时就知道不需要了解线程的终止状态,则可以pthread_attr_t结构中的detachstate线程属性,让线程以分离状态启动。

    设置线程分离状态的函数为:

1
2
3
4
// 第二个参数可选为:
// PTHREAD_CREATE_DETACHED(分离线程)
// PTHREAD _CREATE_JOINABLE(非分离线程)
pthread_attr_setdetachstate(pthread_attr_t *attr, int detachstate)

这里要注意的一点是,如果设置一个线程为分离线程,而这个线程运行又非常快,它很可能pthread_create函数返回之前就终止了,它终止以后就可能将线程号和系统资源移交给其他的线程使用,这样调用pthread_create的线程就得到了错误的线程号。

要避免这种情况可以采取一定的同步措施,最简单的方法之一是可以在被创建的线程里调用pthread_cond_timewait函数,让这个线程等待一会儿,留出足够的时间让函数pthread_create返回。设置一段等待时间,是在多线程编程里常用的方法。但是注意不要使用诸如wait()之类的函数,它们是使整个进程睡眠,并不能解决线程同步的问题。

另外一个可能常用的属性是线程的优先级,它存放在结构sched_param中。用函数pthread_attr_getschedparam和函数pthread_attr_setschedparam进行存放,一般说来,我们总是先取优先级,对取得的值修改后再存放回去。

线程等待——正确处理线程终止

1
2
3
4
5
#include <pthread.h>
void pthread_exit(void *retval);
//挂起等待th结束, *thread_return=retval;
void pthread_join(pthread_t th,void *thread_return);
int pthread_detach(pthread_t th);

如果线程处于joinable状态,则只能只能被创建他的线程等待终止。

在Linux平台默认情况下,虽然各个线程之间是相互独立的,一个线程的终止不会去通知或影响其他的线程。但是已经终止的线程的资源并不会随着线程的终止而得到释放,需要调用 pthread_join() 来**获得另一个线程的终止状态并且释放该线程所占的资源。**在thread_join中存储了返回状态。

调用该函数的线程将挂起,等待 th 所表示的线程的结束。 thread_return 是指向线程 th 返回值的指针。并且只可以有唯一的一个线程对 th 调用 pthread_join()

需要注意的是 th 所表示的线程必须是 joinable 的,即处于非 detached(游离)状态;

如果 th 处于 detached 状态,那么对 th 的 pthread_join() 调用将返回错误。

如果不关心一个线程的结束状态,那么也可以将一个线程设置为 detached 状态,从而让操作系统在该线程结束时来回收它所占的资源。将一个线程设置为detached 状态可以通过两种方式来实现。

  • 一种是调用 pthread_detach() 函数,可以将线程 th 设置为 detached 状态。
  • 另一种方法是在创建线程时就将它设置为 detached 状态,首先初始化一个线程属性变量,然后将其设置为 detached 状态,最后将它作为参数传入线程创建函数 pthread_create(),这样所创建出来的线程就直接处于 detached 状态。

创建 detach 线程:

1
2
3
4
5
pthread_t tid;
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
pthread_create(&tid, &attr, THREAD_FUNCTION, arg);

总之为了在使用 pthread 时避免线程的资源在线程结束时不能得到正确释放,从而避免产生潜在的内存泄漏问题,在对待线程结束时,要确保该线程处于 detached 状态,否着就需要调用 pthread_join() 函数来对其进行资源回收。