并发编程4

2022/4/22 1:12:39

本文主要是介绍并发编程4,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

目录
  • GIL与普通互斥锁区别
  • 验证多线程是否有用
    • 单个CPU
    • 多个CPU
  • 死锁现象
  • 信号量与event事件
    • 信号量
    • event事件
  • 进程池与线程池
    • 线程池
    • 进程池
  • 协程
  • 基于协程实现TCP服务端并发
    • 服务端
    • 客户端

GIL与普通互斥锁区别

# 验证GIL的存在
from threading import Thread

count = 100


def task():
    global count
    count -= 1


for i in range(100):  # 创建一百个线程
    t = Thread(target=task)
    t.start()
print(count)  # 0 线程的执行不是同时进行的,没有造成数据错乱
# 验证不同数据加不同锁
from threading import Thread, Lock
import time

count = 100
mutex = Lock()


def task():
    global count  # global关键字声明
    mutex.acquire()
    temp = count
    time.sleep(0.1)
    count = temp - 1
    mutex.release()


t_list = []
current_time = time.time()
for i in range(100):  # 创建一百个线程
    t = Thread(target=task)
    t.start()
    t_list.append(t)
for t in t_list:
    t.join()
    print(count)  # 100-0
# 等待所有的线程运行完毕再打印money
print(count, time.time() - current_time)  # 0 10.867115020751953

验证多线程是否有用

单个CPU

多个IO密集型任务
多进程:浪费资源 无法利用多个CPU
多线程:节省资源 切换+保存状态
多个计算密集型任务
多进程:耗时更长 创建进程的消耗+切换消耗
多线程:耗时较短 切换消耗

多个CPU

多个IO密集型任务
多进程:浪费资源 多个CPU利用率低
多线程:节省资源 切换+保存状态
多个计算密集型任务
多进程:利用多核 速度更快
多线程:速度较慢

# 计算密集型 
from threading import Thread
from multiprocessing import Process
import os
import time


def work():
    res = 1
    for i in range(1, 2000000):
        res += i
    print(res)


if __name__ == '__main__':
    print(os.cpu_count())  # 4  查看当前计算机CPU个数
    # process_start_time = time.time()
    # p_list = []
    # for i in range(4):
    #     p = Process(target=work)
    #     p.start()
    #     p_list.append(p)
    # for p in p_list:
    #     p.join()
    # print('多进程总耗时:%s' % (time.time() - process_start_time))  # 总耗时:1.0024194717407227
    thread_start_time = time.time()
    t_list = []
    for i in range(4):
        t = Thread(target=work)
        t.start()
        t_list.append(t)
    for t in t_list:
        t.join()
    print('多线程总耗时:%s' % (time.time() - thread_start_time))  # 总耗时:0.753563642501831
# IO密集型
from threading import Thread
from multiprocessing import Process
import time


def work():
    time.sleep(1)   # 模拟IO操作


if __name__ == '__main__':
    thread_start_time = time.time()
    t_list = []
    for i in range(100):
        t = Thread(target=work)
        t.start()
    for t in t_list:
        t.join()
    print('多线程总耗时:%s' % (time.time() - thread_start_time))  # 总耗时:0.017986774444580078
    process_start_time = time.time()
    p_list = []
    for i in range(100):
        p = Process(target=work)
        p.start()
    for p in p_list:
        p.join()
    print('多进程总耗时:%s' % (time.time() - process_start_time))  # 总耗时:0.6566195487976074

死锁现象

即便设定了如何抢锁和放锁也会产生死锁现象

from threading import Thread, Lock
import time


mutexA = Lock()
mutexB = Lock()


class MyThread(Thread):
    def run(self):
        self.f1()
        self.f2()

    def f1(self):
        mutexA.acquire()
        print(f'{self.name}抢到了A锁')
        mutexB.acquire()
        print(f'{self.name}抢到了B锁')
        mutexB.release()
        mutexA.release()

    def f2(self):
        mutexB.acquire()
        print(f'{self.name}抢到了B锁')
        time.sleep(2)
        mutexA.acquire()
        print(f'{self.name}抢到了A锁')
        mutexA.release()
        mutexB.release()


for i in range(20):
    t = MyThread()
    t.start()

信号量与event事件

信号量

from threading import Thread, Semaphore
import time
import random

sp = Semaphore(5)  # 创建一个有五个停车位的停车场


def task(car_num):
    sp.acquire()  # 抢锁
    print('%s正在倒车入库' % car_num)
    time.sleep(random.randint(1, 5))
    sp.release()  # 放锁
    print(f'{car_num}开走了')


for i in range(1, 10):
    t = Thread(target=task, args=('车辆沪B0000%s' % i, ))
    t.start()

event事件

from threading import Thread, Event
import time

event = Event()  # 类似于造了一个红绿灯


def light():
    print('红灯亮,所有车驻车等待')
    time.sleep(30)
    print('绿灯亮,可以通行')
    event.set()


def car(name):
    print('%s正在等红灯' % name)
    event.wait()
    print('%s加油门,缓慢起步' % name)


t = Thread(target=light)
t.start()
for i in range(1, 11):
    t = Thread(target=car, args=('五菱mini%s' % i,))
    t.start()

进程池与线程池

线程池

from concurrent.futures import ThreadPoolExecutor
import time
import os

# 线程池
pool = ThreadPoolExecutor(5)  # 线程池线程数默认是CPU个数的五倍,支持自定义


def task(n):
    time.sleep(1)
    print(n)
    print(os.getpid())
    return '任务的执行结果:%s' % n**2


def func(*args, **kwargs):
    print(args, kwargs)
    print(args[0].result())


for i in range(20):
    res = pool.submit(task, i)
    print(res.result())
    pool.submit(task, i).add_done_callback(func)

进程池

from concurrent.futures import ProcessPoolExecutor
from multiprocessing import current_process
import time
import os

# 进程池
pool = ProcessPoolExecutor(5)  # 进程池进程数默认是CPU个数,支持自定义


def task(n):
    time.sleep(2)
    print(n)
    print(os.getpid())
    print(current_process().name)


def func(*args, **kwargs):
    print(args, kwargs)
    print(args[0].result())
    

if __name__ == '__main__':
    for i in range(20):
        pool.submit(task, i).add_done_callback(func)

协程

协程,单线程下实现并发。程序员通过代码来检测程序的IO操作并处理,使得CPU感觉不到IO的存在从而最大幅度的占用CPU

from gevent import monkey;monkey.patch_all()
from gevent import spawn
import time


def buy(name):
    print('%s buy uc' % name)
    time.sleep(5)
    print('%s buy eleme' % name)


def found(name):
    print('%s found alibaba' % name)
    time.sleep(3)
    print('%s found alipay' % name)


start_time = time.time()
g1 = spawn(buy, 'jack_ma')
g2 = spawn(found, 'jack_ma')
g1.join()
g2.join()
print('总耗时:', time.time() - start_time)  # 总耗时: 5.3707029819488525

基于协程实现TCP服务端并发

服务端

from gevent import monkey;monkey.patch_all()
from gevent import spawn
import socket


def communication(sock):
    while True:
        data = sock.recv(1024)  # IO操作
        print(data.decode('utf8'))
        sock.send(data.upper())


def get_server():
    server = socket.socket()
    server.bind(('127.0.0.1', 8080))
    server.listen(5)
    while True:
        sock, addr = server.accept()  # IO操作
        spawn(communication, sock)


g1 = spawn(get_server)
g1.join()

客户端

from threading import Thread, current_thread
import socket


def get_client():
    client = socket.socket()
    client.connect(('127.0.0.1', 8080))
    count = 0
    while True:
        send_msg = input('send_msg>>>:').strip()
        msg = '%s %s %s' % (current_thread().name, send_msg, count)
        count += 1
        client.send(msg.encode('utf8'))
        data = client.recv(1024)
        print(data.decode('utf8'))


# 创建多线程
for i in range(20):
    t = Thread(target=get_client)
    t.start()


这篇关于并发编程4的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程