17. 并发执行¶
17.1. threading¶
提供基础的并行操作
主要方法:
threading.active_count: 活动个数
therading.current_thread: 当前线程
threading.get_ident: 获取线程的identifier
threading.enumerate:线程遍历
threading.main_thread: 主进程
threading.settrace: 设置trace函数
threading.setprofile: 设置profile函数
threading.stack_size: 设置stack大小
threading.TIMEOUT_MAX: 线程超时最大值
17.1.1. threading.Thread¶
start: 启动一个线程
run: 运行线程
join: 等待线程中断
name: 名字
getName: 获取名字
setName: 设置名字
ident: 进程唯一标识符
is_alive: 是否存活的
daemon: 是否是守护进程
isDaemon: 是否是守护进程
setDaemon: 设置为守护进程
17.1.2. threading.Lock¶
acqure: 请求一个锁
release: 释放锁
17.1.3. threading.RLock¶
acqure: 请求一个锁
release: 释放锁
17.1.4. threading.Condition¶
acqure: 请求一个锁
release: 释放锁
wait: 等待一直到一个通知或者超时
wait_for: 等待到一个条件为true
notify: 通知
notify_all: 通知所有
17.1.5. threading.Event¶
is_set:只有内部标记为true返回true
set: 设置内部标记
clear: 清空时间
wait: 阻塞一直到内部标记为true
17.1.6. threading.Timer¶
cancel: 取消
17.1.7. threading.Barrier¶
wait: 等待
reset: 重置
abort: 中断
parties: 需要pass的线程个数
n_waiting: 当前等待个数
broken: 是否是broken状态
17.2. multiprocessing¶
类似线程的多处理模块
Pool样例使用
from multiprocessing import Pool
def f(x):
return x*x
if __name__ == '__main__':
with Pool(5) as p:
print(p.map(f, [1, 2, 3]))
Process样例使用
from multiprocessing import Process
def f(name):
print('hello', name)
if __name__ == '__main__':
p = Process(target=f, args=('bob',))
p.start()
p.join()
两个进程通信样例
from multiprocessing import Process, Pipe
def f(conn):
conn.send([42, None, 'hello'])
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=f, args=(child_conn,))
p.start()
print(parent_conn.recv()) # prints "[42, None, 'hello']"
p.join()
进程同步样例
from multiprocessing import Process, Lock
def f(l, i):
l.acquire()
try:
print('hello world', i)
finally:
l.release()
if __name__ == '__main__':
lock = Lock()
for num in range(10):
Process(target=f, args=(lock, num)).start()
共享内存样例
from multiprocessing import Process, Value, Array
def f(n, a):
n.value = 3.1415927
for i in range(len(a)):
a[i] = -a[i]
if __name__ == '__main__':
num = Value('d', 0.0)
arr = Array('i', range(10))
p = Process(target=f, args=(num, arr))
p.start()
p.join()
print(num.value)
print(arr[:])
17.3. multiprocessing.Process¶
主要方法
run: 运行
start:启动
join: 等待
name:名字
is_alive: 是否存活
daemon: 是否是守护进程
pid: 进程id
exitcode: 退出码
authkey: 认证key
sentinel: 系统对象数字句柄
terminate: 中断进程
17.4. multiprocessing.Process¶
返回一个(conn1,conn2)的连接对象。
17.5. multiprocessing.Queue¶
主要方法
qsize: 队列大小
empty: 是否为空
full: 是否满了
put: 添加对象
put_nowait: 不等待添加
get: 移除并返回一个对象
get_noewait: 不等待移除对象
close: 关闭
join_thread: 等待后台进程
cancel_join_thread: 取消join进程
17.6. multiprocessing.Queue¶
17.7. multiprocessing.JoinableQueue¶
17.8. multiprocessing.connection.Connection¶
主要方法
send: 发送对象
recv: 接受对象
fileno: 文件描述符
close: 关闭
poll: 返回是否有数据可读
send_bytes: 发送字节对象
recv_bytes: 接受字节对象
recv_bytes_into: 接受字节到一个buffer
17.8.1. 共享对象¶
multiprocessing.Value
multiprocessing.Array
multiprocessing.sharedctypes.RawArray
multiprocessing.sharedctypes.RawValue
multiprocessing.sharedctypes.Array
multiprocessing.sharedctypes.Value
multiprocessing.sharedctypes.copy
multiprocessing.sharedctypes.synchronized
17.8.2. 进程管理者¶
multiprocessing.managers.BaseManager
主要方法
start: 启动
get_server:获取server对象
connect: 连接
shutdown: 关闭
register: 注册
address: 管理者使用地址
multiprocessing.managers.SyncManager
主要方法
Barrier: 创建一个共享的屏障对象
Bounded: 创建一个有界信号量
Event: 创建一个事件对象
Lock: 创建一个共享锁对象
Namespace: 创建一个共享的namespace
Queue: 创建一个共享的队列
Rlock: 创建一个共享锁
Semaphore: 创建一个信号量
Array: 创建一个数组
Value: 创建一个数值
dict: 字典
list: 列表
multiprocessing.pool.Pool
主要方法:
apply: 给定的函数和参数去调用
apply_async: 异步调用
map: 并行操作
map_async: 异步操作
imap: lazier版本的map
imap_unordered:类似imap,结果是乱的
starmap: 和map类似
startmap_async:
close: 关闭
terminate: 中断
join: 等待进程去退出
multiprocessing.pool.AsyncResult
主要方法:
get: 返回结果如果结果到达
wait: 等待数秒等结果到来
ready: 返回是否调用完毕
successful: 返回是否完成且没有异常
日志
multiprocessing.get_logger()获取日志
multiprocessing.log_to_stderr() 日志输出到标准错误去。