编程中、通信中所说的同步与生活中大家印象中的同步概念略有差异。“同”字应是指协同、协助、互相配合。主旨在协同步调,按预定的先后次序运行
线程同步,指一个线程发出某一功能调用时,在没有得到结果之前,该调用不返回。同时其它线程为保证数据一致性,不能调用该功能。
线程同步可以通过锁来实现!与锁相关的部分函数的man page
需要单独安装。
1
| sudo apt install manpages-posix-dev
|
mutex 互斥量(互斥锁)
每个线程在对资源操作前都尝试先加锁,成功加锁才能操作,操作结束解锁。
资源还是共享的,线程间也还是竞争的,但通过“锁”就将资源的访问变成互斥操作。
互斥锁实质上是操作系统提供的一把“建议锁”(又称“协同锁”),建议程序中有多线程访问共享资源的时候使用该机制。但,并没有强制限定。因此,即使有了 mutex,如果有线程不按规则来访问数据,依然会造成数据混乱。
尽量保证锁的粒度,越小越好(访问共享数据前,加锁。访问结束立即解锁)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| #include <pthread.h>
pthead_mutex_t muetx = PTHREAD_MUTEX_INITIALIZER;
int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr);
int pthread_mutex_destroy(pthread_mutex_t *mutex);
int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_trylock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);
|
pthread_mutex_init 函数
初始化互斥锁。
如果互斥锁 mutex 是静态分配的(定义在全局,或加了 static 关键字修饰),可以直接使用宏进行初始化。
pthead_mutex_t muetx = PTHREAD_MUTEX_INITIALIZER;
局部变量应采用动态初始化,即使用 pthread_mutex_init(&mutex, NULL);
初始化。
1 2 3 4
| #include <pthread.h>
int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr);
|
pthread_mutexattr_t 的选项包括:
- PTHREAD_MUTEX_NORMAL:普通锁。普通锁不允许递归调用,如果线程试图对已经拥有的锁再次加锁,它将阻塞。
- PTHREAD_MUTEX_RECURSIVE:递归锁。递归锁允许线程多次加锁,每次必须解锁一次。
- PTHREAD_MUTEX_ERRORCHECK:检查锁。检查锁与普通锁相似,但它还允许线程检查是否当前已经拥有该锁。
- PTHREAD_MUTEX_DEFAULT:默认锁类型。默认锁类型是普通锁。
用户可以根据需要使用不同类型的互斥量,例如递归锁用于处理递归函数,检查锁用于检测错误,而普通锁是一般的互斥量。
死锁
- 线程试图对同一个互斥量 A 加锁两次。
- 线程 1 拥有 A 锁,请求获得 B 锁;线程 2 拥有 B 锁,请求获得 A 锁。
example
测试1:在不加锁的情况下,下面的i++
和printf
之间可能被另一个线程打断,导致输出的i
的大小不连续!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| #include <stdio.h> #include <stdlib.h> #include <pthread.h> #include <time.h> #include <unistd.h>
int i = 0;
void *thread_func(void *args) { while(1) { i++; printf("thread_func: %d \n", i); usleep(10); } return NULL; }
int main() { pthread_t tid; pthread_create(&tid, NULL, thread_func, NULL); while(1) { i++; printf("main_func: %d \n", i); usleep(10); } return 0; }
|
加锁保证数据不被另一个线程修改!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| #include <stdio.h> #include <stdlib.h> #include <pthread.h> #include <time.h> #include <unistd.h>
int i = 0; pthread_mutex_t mutex_i = PTHREAD_MUTEX_INITIALIZER;
void *thread_func(void *args) { while(1) { pthread_mutex_lock(&mutex_i); i++; printf("thread_func: %d \n", i); pthread_mutex_unlock(&mutex_i); usleep(10); } return NULL; }
int main() {
pthread_t tid; pthread_create(&tid, NULL, thread_func, NULL);
while(1) { pthread_mutex_lock(&mutex_i); i++; printf("main_func: %d \n", i); pthread_mutex_unlock(&mutex_i); usleep(10); } return 0; }
|
rwlock 读写锁
- 读共享,写独占。
- 写锁优先级高于读锁(读锁写锁同时来,优先处理写锁。读锁加锁成功后,写锁也必须等待读锁释放)。
- 锁还是只有一把,但分以读模式加锁和以写模式加锁两种。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| #include <pthread.h>
pthread_rwlock_t rwlock = PTHREAD_RWLOCK_INITIALIZER;
int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock, const pthread_rwlockattr_t *restrict attr); int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);
int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock); int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock); int pthread_rwlock_trywrlock(pthread_rwlock_t *rwlock); int pthread_rwlock_tryrdlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);
|
cond 条件变量
条件变量本身不是锁!但它也可以造成线程阻塞。通常与互斥锁配合使用。给多线程提供一个会合的场所。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| #include <pthread.h>
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr); int pthread_cond_destroy(pthread_cond_t *cond);
int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);
int pthread_cond_timedwait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex, const struct timespec *restrict abstime);
int pthread_cond_signal(pthread_cond_t *cond);
int pthread_cond_broadcast(pthread_cond_t *cond);
|
pthread_cond_wait 函数
阻塞等待一个条件变量!
1 2
| int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);
|
- cond:条件变量的地址
- mutex:互斥锁的地址
- 返回值:0(成功),非 0(失败)
函数作用:
- 阻塞等待条件变量
cond
满足
- 释放已掌握的互斥锁(解锁互斥量)相当于
pthread_mutex_unlock(&mutex)
; 1.2.两步为一个原子操作。
- 当被唤醒,
pthread_cond_wait
函数返回时,解除阻塞并重新申请获取互斥锁 pthread_mutex_lock(&mutex)
;
注意:pthread_cond_broadcast
会唤醒所有阻塞在该条件变量上的线程,但并不意味这所有线程会同时收到pthread_cond_wait
函数返回,因为该函数返回前还有获取互斥锁这一步,任意一个线程获取锁后,其他线程都会阻塞等待!所以,实际上线程的唤醒任然有先后顺序!
此函数的原理不太好理解,建议看视频介绍条件变量原理!
pthread_cond_timedwait 函数
阻塞等待一个条件变量,最长等待到 abstime 时刻。
1 2 3
| int pthread_cond_timedwait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex, const struct timespec *restrict abstime);
|
- cond:条件变量的地址
- mutex:互斥锁的地址
- abstime:等待的时间,绝对时间(即从1970年1月1日零时起的纳秒数)
- 返回值:0(成功),非 0(失败)
生产者消费者实现
条件变量和互斥量是对产品列表加锁,无论是生产者还是消费者,想要生产或消费产品,都需要先获得锁,才能对产品列表进行操作!
这里用到的核心函数就是[pthread_cond_wait](#pthread_cond_wait 函数),只要理解该函数的逻辑,就容易理解下面的模型!
此处实现的产品是后生产的产品先被消费(栈)!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
| #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <pthread.h>
struct msg { int num; struct msg *next; };
struct msg *product; pthread_cond_t has_product = PTHREAD_COND_INITIALIZER; pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
void *consumer(void *arg) { struct msg *p; while(1) { pthread_mutex_lock(&lock);
while(product == NULL) { pthread_cond_wait(&has_product, &lock); }
p = product; product = product->next; printf("consumer %lu --- product %d\n", pthread_self(), p->num); pthread_mutex_unlock(&lock);
free(p); sleep(rand() % 5); } return NULL; }
void *producer(void *arg) { struct msg *p; while(1) { p = malloc(sizeof(struct msg)); p->num = rand() % 1000 + 1;
pthread_mutex_lock(&lock); printf("producer %lu --- product %d\n", pthread_self(), p->num); p->next = product; product = p; pthread_mutex_unlock(&lock);
pthread_cond_broadcast(&has_product); sleep(rand() % 2); } return NULL; }
int main() {
pthread_t cons, prod; pthread_create(&prod, NULL, producer, NULL); pthread_create(&cons, NULL, consumer, NULL);
pthread_t cons1, cons2; pthread_create(&cons1, NULL, consumer, NULL); pthread_create(&cons2, NULL, consumer, NULL); pthread_join(cons1, NULL); pthread_join(cons2, NULL);
pthread_join(prod, NULL); pthread_join(cons, NULL); return 0; }
|
sem 信号量
进化版的互斥锁,加锁的线程数量 N 可以自定义,即同时访问数据的线程数量可以为 N。
初值为N,并不是最大值为N,但若调用sem_post,N值还会变大!
同样是建议锁,虽然支持多次加锁,但信号量本身并不能保证数据不紊乱,而是需要底层数据结构支持并发操作。
信号和信号量毫无关系!!
1 2 3 4 5 6 7 8 9 10
| #include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value); int sem_destroy(sem_t *sem);
int sem_wait(sem_t *sem); int sem_trywait(sem_t *sem); int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout);
int sem_post(sem_t *sem);
|
信号量可以应用与线程、进程之间的同步!
sem_init 函数
初始化信号量,可以设置是否共享,以及初始值。
1 2 3
| #include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
|
sem
:信号量的地址
pshared
:是否共享,0
(用于线程间同步),1
(用于进程间同步,在进程间共享)
value
:N 值,可同时访问数据的线程数量.
- 返回值:0(成功),-1(失败)
3类信号量
信号量有三种类型:
- Posⅸ 有名信号量:可用于进程或线程间的同步。
- Posix 无名信号量:基于内存的信号量,存放在内存区中,可用于进程或线程间的同步。常用于多线程间同步。
- System V信号量(IPC机制):在内核中维护,可用于进程或线程间的同步。常用于进程间同步。
在多线程下,全局变量可在线程间共享,因此将信号量设置为全局变量即可。但在多进程环境下,每个进程间并不共享变量!这三类信号量的共享机制为:
- 有名信号量通过文件系统中的路径名对应的文件名进行维护(信号量只是通过文件名进行标识,信号量的值并不存放到这个文件中,除非信号量存放在空间映射到这个文件上)。
- 无名信号量通过用户空间内存进行维护,无名信号量要想在进程间通信,该内存必须为共享内存区*。如上面的
pshared
参数!
- System V信号量(IPC机制)并不在用户空间创建,它由内核维护。它的标识是使用
IPC Key
!
生产者消费者实现2
这个与上面基于条件变量的实现逻辑不同,建议先看视频基于信号量实现生产者消费者模型理解实现的原理,再看具体实现代码。
注意,对信号量初始化时,将 product_num
初始化为0,但仍然可以调用sem_post
, sem_wait
等函数,这与上面所说的最多只能有 N (这里为0) 是不是矛盾?怎么实现多个消费者?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
| #include <pthread.h> #include <semaphore.h> #include <stdio.h> #include <stdlib.h> #include <time.h> #include <unistd.h>
#define NUM 10
sem_t blank_num; sem_t product_num;
int products[NUM];
void *producer(void *arg) { int i = 0; while(1) { sem_wait(&blank_num);
products[i] = rand() % 1000; printf("produce product: %d in index: %d\n", products[i], i); sem_post(&product_num);
i = (i+1) % NUM; sleep(rand() % 3); } return NULL; }
void *consumer(void *arg) { int i = 0; while(1) { sem_wait(&product_num);
printf("consume product: %d in index: %d\n", products[i], i); sem_post(&blank_num);
i = (i+1) % NUM; sleep(rand() % 3); } return NULL; }
int main() { pthread_t prod, cons;
sem_init(&product_num, 0, 0); sem_init(&blank_num, 0, NUM);
pthread_create(&prod, NULL, producer, NULL); pthread_create(&cons, NULL, consumer, NULL);
pthread_join(prod, NULL); pthread_join(cons, NULL); return 0; }
|
对比总结
pthread_mutex_t |
pthread_rwlock_t |
pthread_cond_t |
sem_t |
pthread_mutex_init |
pthread_rwlock_init |
pthread_cond_init |
sem_init |
pthread_mutex_lock |
pthread_rwlock_wrlock pthread_rwlock_rdlock |
pthread_cond_wait |
sem_wait |
pthread_mutex_trylock |
pthread_rwlock_trywrlock pthread_rwlock_tryrdlock |
|
sem_trywait |
|
|
pthread_cond_timedwait |
sem_timedwait |
pthread_mutex_unlock |
pthread_rwlock_unlock |
pthread_cond_signal pthread_cond_broadcast |
sem_post |
pthread_mutex_destroy |
pthread_rwlock_destroy |
pthread_cond_destroy |
sem_destroy |
下面的方法有助于理解其实现原理,但不是真实情况:
mutex
可以看作一个整数,且只有两种取值:0和1。
- init :将
i
的值设置为1。
- lock :相当于
i--
,将 i
的值从 1 变为 0。如果 i
为0,则阻塞。
- unlock :相当于
i++
,若 i 已经为 1,i 将不会变化。
sem
相当于初值为 N 的互斥量
- init :初始化
i
,初始值为 N。
- wait :相当于
i--
,若 i
为0,则阻塞。
- post :相当于
i++
,若 i
已经达到 N,还是会 ++
!(注意与 mutex
区分)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| int main() { pthread_mutex_t m; pthread_mutex_init(&m, NULL); pthread_mutex_unlock(&m); pthread_mutex_lock(&m); printf("lock 1\n"); pthread_mutex_lock(&m); printf("lock 2\n"); return 0; }
int main() { sem_t s; sem_init(&s, 1, 3); sem_post(&s); sem_wait(&s); printf("wait 1\n"); sem_wait(&s); printf("wait 2\n"); sem_wait(&s); printf("wait 3\n"); sem_wait(&s); printf("wait 4\n"); sem_wait(&s); printf("wait 5\n"); sem_destroy(&s); return 0; }
|
内核锁
上面几种锁都是在用户态使用,在内核中,也有类似的实现。
头文件 |
结构 |
描述 |
<asm/semaphore.h> |
struct semaphore |
信号量/互斥量 |
<linux/rwsem.h> |
struct rw_semaphore |
读写锁 |
<linux/completion.h> |
struct completion |
类似条件变量 |
<linux/spinlock.h> |
spinlock_t |
自旋锁 |
<linux/spinlock.h> |
rwlock_t |
自旋读写锁 |
加锁总会影响系统的性能,在一些场景下可以考虑不加锁的算法!如:环形队列、原子变量、位操作、顺序锁seqlock
、读取-拷贝-更新(RCU)。
锁陷阱
- 不允许一个持锁者第 2 次请求锁!
- 获得多个锁可能是危险的,当多个锁必须获得时,它们应当一直以同样顺序获得!
- 尽量保证锁的粒度,越小越好(访问共享数据前,加锁,访问结束立即解锁)。
相关资料