1、计算机基础
- 三部分
- 软件
- 操作系统
提供硬件操作接口
管理调度进程 - 硬件
cpu
内存(速度快、断点数据消失)
硬盘(速度慢、永久保存数据)
- 批处理系统
- 串行执行程序
一个一个按顺序执行程序
- 串行执行程序
- 多道计数
- 空间复用
内存存储多个程序
- 时间复用
减少cpu等待时间(减少IO阻塞)
- 并发执行
看起来同时执行
单核并发,多核并行
- 空间复用
- 分时操作系统
- 多个程序进程之间的内存互相隔离
2、进程
- 状态
- 运行 --> 就绪 --> 运行
- 运行 --> 阻塞 --> 就绪 --> 运行
- 使用方法
- multiprocessing模块.Process对象
from multiprocessing import Process import os def func(): print('子进程,ID:', os.getpid()) # 子进程,ID: 71980 if __name__ == '__main__': t = Process(target=func, args=()) t.start() # 执行func函数 print('主进程,ID:', os.getpid()) # 主进程,ID: 71978
- 自定义模块
from multiprocessing import Process import os class MyProcess(Process): def __init__(self): super(MyProcess, self).__init__() print('init,ID:', os.getpid()) # init,ID: 72097 def run(self): print('子进程,ID:', os.getpid()) # 子进程,ID: 72099 if __name__ == '__main__': t = MyProcess() t.start() # 执行run函数 print('主进程,ID:', os.getpid()) # 主进程,ID: 72097
- 其它方法
t.jion() # 等待子进程结束
t.is_alive() # 判断进程是否结束
t.pid # 当前进程ID
t.terminate() # 结束进程
t.name # 进程名
- multiprocessing模块.Process对象
- 僵尸进程
- 子进程被kill,父进程还在
- 缺点:父进程不结束,僵尸进程占用系统PID号
- 孤儿进程
- 子进程还在,父进程被kill
- 被init进程监管,无害
- 进程之间内存隔离
from multiprocessing import Process n = 10 def func(): global n n = 0 print('子进程', n) if __name__ == '__main__': t = Process(target=func, ) t.start() t.join() print('主进程', n)
- 守护进程
- 父进程结束,守护进程也跟着结束
- 守护进程内不能再开启子进程
- 用法:t.daemon = Ture
from multiprocessing import Process import time def func(): for i in range(10): time.sleep(1) print('子进程', i) if __name__ == '__main__': t = Process(target=func, ) t.daemon = True t.start() time.sleep(3) print('主进程')
- 互斥锁
- 与join的区别
互斥锁可以把代码局部锁定(如对共享数据的修改)
join把整个进程锁定
from multiprocessing import Process, Lock import time def func(mutex, i): mutex.acquire() # 加锁 for j in range(3): time.sleep(1) print('process %s:%s' % (i, j)) mutex.release() # 释放 def process(): mutex = Lock() for i in range(2): p = Process(target=func, args=(mutex, i)) p.start() if __name__ == '__main__': process()
- 与join的区别
- 队列
q = Queue(3)
q.put(msg)
msg = q.get()
- 生产者消费者模型,Queue
from multiprocessing import Queue, Process import time def producer(q): for i in range(5): time.sleep(0.5) res = '包子%s' % i print('生产者生产了%s' % res) q.put(res) def customer(q): while True: time.sleep(0.3) res = q.get() if not res: break print('消费者吃了%s' % res) if __name__ == '__main__': q = Queue() for i in range(3): p = Process(target=producer, args=(q,)) p.start() pc = Process(target=customer, args=(q,)) pc2 = Process(target=customer, args=(q,)) pc.start() pc2.start() p.join() q.put(None) q.put(None)
- 生产者消费者模型,JoinableQueue
from multiprocessing import JoinableQueue, Process import time def producer(q): for i in range(5): time.sleep(0.5) res = '包子%s' % i print('生产者生产了%s' % res) q.put(res) q.join() # 队列为空时,执行 def customer(q): while True: time.sleep(0.3) res = q.get() print('消费者吃了%s' % res) q.task_done() # 消费者吃完所有包子,触发q.join() if __name__ == '__main__': q = JoinableQueue() for i in range(3): p = Process(target=producer, args=(q,)) p.start() pc = Process(target=customer, args=(q,)) pc2 = Process(target=customer, args=(q,)) pc.daemon = True # 避免孤儿进程 pc2.daemon = True pc.start() pc2.start() p.join()
3、线程
- 用法
- threading模块.Thread类
from threading import Thread def func(name): print(name) if __name__ == '__main__': t = Thread(target=func, args=('子线程',)) t.start() print('主线程')
- 自定义
from threading import Thread class MyThread(Thread): def __init__(self, name): super().__init__() self.name = name def run(self): print(self.name) if __name__ == '__main__': t = MyThread('子线程') t.start() print('主线程')
- threading模块.Thread类
- 进程与线程区别
- 开进程的开销远大于线程
- 同一进程内的多线程共享该进程的地址空间
from threading import Thread from multiprocessing import Process n = 10 def func(name): global n n = 0 print(name, n) if __name__ == '__main__': # p = Process(target=func, args=('子进程',)) # p.start() # p.join() t = Thread(target=func, args=('子线程',)) t.start() t.join() print('主线程', n)
- 同一进程内多线程PID相同
from threading import Thread from multiprocessing import Process import os def func(name): print('子线程id:%s,父线程id:%s' % (os.getpid(), os.getppid())) if __name__ == '__main__': # p = Process(target=func, args=('子进程',)) # p.start() # p.join() t = Thread(target=func, args=('子线程',)) t.start() t.join() print('主线程id:%s'% os.getpid())
- 补充(终端查看pid方法)
Windows:
tasklist |findstr python
Linux:ps aux |grep python
- 其它方法
from threading import Thread, current_thread, active_count, enumerate import os def func(name): pass if __name__ == '__main__': t = Thread(target=func, args=('子线程',)) t.start() t.join() print('%s:%s' % (current_thread().getName(), os.getpid())) # MainThread:4898 current_thread().setName('主线程') print('%s:%s' % (current_thread().getName(), os.getpid())) # 主线程:4898 print(t.is_alive()) # False print(active_count()) # 1 print(enumerate()) # [<_MainThread(主线程, started 4754021824)>]
- 守护线程
- 所有非守护线程结束后,主线程才跟着结束
- 主线程一旦结束,所有守护线程也跟着结束
- 使用方法
t.daemon = True
t.setDaemon('Ture')
- 互斥锁
from threading import Thread, Lock import time n = 100 def a(lock): global n lock.acquire() temp = n time.sleep(0.0000001) n = temp - 1 lock.release() if __name__ == '__main__': t_l = [] lock = Lock() for i in range(100): t = Thread(target=a, args=(lock,)) t.start() t_l.append(t) for t in t_l: t.join() print(n) # n==0
- GIL锁
- 保证同一个进程下的所有线程在同一时刻只有一个运行
GIL.acquire() 解释器的代码 # 保证python解释器垃圾回收机制是线程安全的 GIL.release()
- 只能并发,不能并行
- 保证同一个进程下的所有线程在同一时刻只有一个运行
- 计算密集型,推荐多进程
from threading import Thread from multiprocessing import Process import time def func(): n = 0 for i in range(100000000): n += i print(n) if __name__ == '__main__': time_start = time.time() ll = [] for i in range(2): t = Thread(target=func, ) # 线程14秒 # t = Process(target=func,) # 进程8秒 t.start() ll.append(t) for t in ll: t.join() print(time.time() - time_start)
- IO密集型,推荐多线程
from threading import Thread from multiprocessing import Process import time def func(): time.sleep(1) if __name__ == '__main__': time_start = time.time() ll = [] for i in range(50): t = Thread(target=func, ) # 线程1.01秒 # t = Process(target=func,) # 进程2.51秒 t.start() ll.append(t) for t in ll: t.join() print(time.time() - time_start)
- 死锁
- Lock的锁只能acquire一次
from threading import Thread, Lock, current_thread import time mutex1 = Lock() mutex2 = Lock() def func(): mutex1.acquire() print(1, current_thread().getName()) mutex2.acquire() print(2, current_thread().getName()) mutex2.release() mutex1.release() mutex2.acquire() print(2, current_thread().getName()) time.sleep(0.1) mutex1.acquire() print(1, current_thread().getName()) mutex1.release() mutex2.release() if __name__ == '__main__': for i in range(10): t = Thread(target=func, ) t.start()
- Lock的锁只能acquire一次
- 递归锁
- RLock的锁可以acquire多次
from threading import Thread, RLock, current_thread import time mutex2 = mutex1 = RLock() # 同一把锁 # mutex1 = RLock() # 分开两把锁,还是会死锁 # mutex2 = RLock() def func(): mutex1.acquire() print(1, current_thread().getName()) mutex2.acquire() print(2, current_thread().getName()) mutex2.release() mutex1.release() mutex2.acquire() print(2, current_thread().getName()) time.sleep(0.1) mutex1.acquire() print(1, current_thread().getName()) mutex1.release() mutex2.release() if __name__ == '__main__': for i in range(10): t = Thread(target=func, ) t.start()
- RLock的锁可以acquire多次
- 信号量
- 信号量也是一把锁,与互斥锁不同的是,信号量可以设置同时拿到锁的线程的个数
from threading import Thread, Semaphore, current_thread import time, random sm = Semaphore(3) def func(): # sm.acquire() # print(current_thread().getName()) # time.sleep(random.randint(1,3)) # sm.release() with sm: print(current_thread().getName()) time.sleep(random.randint(1, 3)) if __name__ == '__main__': for i in range(10): t = Thread(target=func, ) t.start()
- 信号量也是一把锁,与互斥锁不同的是,信号量可以设置同时拿到锁的线程的个数
- event
from threading import Thread, Event, current_thread import time event = Event() def conn(): count = 0 while not event.is_set(): # 判断event状态 if count == 3: print('nooo') return print('waiting', current_thread().getName()) event.wait(1) # 等待set,可以设置时间参数 count += 1 print('okk', current_thread().getName()) def check(): print('checking', current_thread().getName()) time.sleep(4) event.set() # 激活event if __name__ == '__main__': for i in range(3): t = Thread(target=conn, ) t.start() c = Thread(target=check) c.start()
- 定时器
from threading import Timer import random class MakeQR: def __init__(self): self.make_cache() def make_cache(self): self.qr = self.make_qr() print(self.qr) self.t = Timer(3, self.make_cache, args=()) # 定时3秒,创建线程 self.t.start() def make_qr(self): res = random.randint(1, 3) return str(res) def check(self): while True: inp = input('>>:') if inp == self.qr: print('okk') self.t.cancel() break if __name__ == '__main__': obj = MakeQR() obj.check()
4、线程Queue
- 先进先出 - 队列Queue
import queue q = queue.Queue(1) q.put(1) q.put_nowait(4) # 等同于q.put(4, block=False) q.put(4, block=True) # 锁住不报错 q.put(4, block=False) # 不锁报错 q.put(4, block=True, timeout=2) # 锁住2秒,2秒后报错 print(q.get()) print(q.get_nowait()) # 等同于q.get(block=False) print(q.get(block=True)) # 锁住不报错 print(q.get(block=False)) # 不锁报错 print(q.get(block=True, timeout=2)) # 锁住2秒,2秒后报错
- 后进先出 - 堆栈LifoQueue
q = queue.LifoQueue(3)
- 优先级队列 - PriorityQueue
import queue q = queue.PriorityQueue(3) q.put((1, 'one')) # 参数(优先级,msg),数字越小优先级越高 q.put((3, 'two')) q.put((2, 'three')) print(q.get()) print(q.get()) print(q.get())
5、池
- 进程池
from concurrent.futures import ProcessPoolExecutor
pool = ProcessPoolExecutor(3)
pool.submit(run)
- 线程池
from concurrent.futures import ThreadPoolExecutor
pool = ThreadPoolExecutor(3)
pool.submit(run)
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
import os
import time
def run():
print(os.getpid())
time.sleep(2)
if __name__ == '__main__':
pool = ProcessPoolExecutor(3) # 进程池
# pool = ThreadPoolExecutor(3) # 线程池
for i in range(10):
pool.submit(run) # 不需要start
pool.shutdown(wait=True)
print('main')
- Queue方法实现线程池
from threading import Thread from queue import Queue import os import time def run(q): print(os.getppid()) time.sleep(2) q.get() # 线程结束后,从队列取出一个标志 if __name__ == '__main__': q = Queue(3) for i in range(10): t = Thread(target=run, args=(q,)) q.put(1) # 线程开始前,队列内新增一个标志,队列满则阻塞 t.start() print('main')
6、同步异步
- 同步调用
- 提交任务,直到等待任务完成后,才执行下一行代码,导致程序串行执行
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
import os
import time
def run():
print(os.getpid())
time.sleep(2)
if __name__ == '__main__':
# pool = ProcessPoolExecutor(3) # 进程池
pool = ThreadPoolExecutor(3) # 线程池
for i in range(10):
pool.submit(run).result() # 加result(),线程结束后才开下一个线程
pool.shutdown(wait=True)
print('main')
- 异步调用
- 提交任务,不用等待任务结束,立即执行下一行代码,程序并行
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
import os
import time
def run():
print(os.getpid())
time.sleep(2)
return 'okk'
def func(r): # 被回调的函数,参数为pool.submit(run)
res = r.result() # 拿到run函数的返回结果,代表线程结束,相当于pool.submit(run).result()
print(res)
if __name__ == '__main__':
# pool = ProcessPoolExecutor(3) # 进程池
pool = ThreadPoolExecutor(3) # 线程池
for i in range(10):
pool.submit(run).add_done_callback(func) # 回调函数,参数为函数名
pool.shutdown(wait=True)
print('main')