calculute_util.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. import time
  2. from multiprocessing import Process, Queue, cpu_count
  3. import traceback
  4. def calc_start(n, iter, callback):
  5. """
  6. 多进程API
  7. 参数:
  8. n 创建几个进程,当为0时,自动探测cpu个数,并创建N-1个
  9. iter 任务迭代器,可以直接迭代的对象。
  10. list,tuple或含有yield的函数
  11. 每次迭代返回的内容应该为( func , args , kwargs )
  12. * 注:
  13. func函数必须为模块级函数,否则在子进程中无法正常调用
  14. 且func函数第一个参数为idx(子进程序号)
  15. callback 回调函数,每个任务函数返回值的处理。
  16. 开发人员应该根据任务函数定义。
  17. 当为空时,系统不处理回调。
  18. 回调函数的参数为:
  19. 子进程序号,函数返回结果
  20. 返回值:
  21. 用法:
  22. 在主线程中调用该函数,该函数调用前,需准备好任务迭代器和回调函数。
  23. 该函数会阻塞,直到所有任务完成。
  24. """
  25. # 确定子进程个数
  26. if n == 0:
  27. n = cpu_count() - 1
  28. if n == 0:
  29. n = 1
  30. # 初始化工作队列和结果队列
  31. task_queue = Queue()
  32. done_queue = Queue()
  33. # 先启动消费者待命
  34. subs = []
  35. for i in range(n):
  36. p = Process(target=worker, args=(i, task_queue, done_queue))
  37. subs.append(p)
  38. p.daemon = False
  39. p.start()
  40. try:
  41. # 启动生产者线程
  42. import threading
  43. t = threading.Thread(target=publisher, args=(n, iter, task_queue)).start()
  44. stops = 0
  45. ex = False
  46. while stops < n:
  47. result = done_queue.get()
  48. if result == 'STOP':
  49. stops += 1
  50. elif type(result) is tuple and result[0] == 'EXCEPT':
  51. stops += 1
  52. ex = result[1]
  53. elif result and callable(callback):
  54. callback(*result)
  55. if ex:
  56. raise RuntimeError("并行处理时,子进程发生异常:\n%s" % ex)
  57. finally:
  58. # 清理所有子进程
  59. for p in subs:
  60. p.join()
  61. def publisher(n, iter, task_queue):
  62. """
  63. 使用线程处理任务发生的原因是有可能iter是一个yield函数迭代器,内容可能会非常多。
  64. """
  65. try:
  66. for obj in iter:
  67. if type(obj) is not tuple:
  68. raise RuntimeError('任务迭代器应该返回元组对象[%r]' % obj)
  69. if len(obj) != 3:
  70. raise RuntimeError('任务迭代器应该返回三元素元组对象[%r]' % obj)
  71. if not callable(obj[0]):
  72. raise RuntimeError('任务的第一元素应该为可执行对象[%r]' % obj[0])
  73. if type(obj[1]) is not tuple:
  74. raise RuntimeError('任务的第二元素应该为tuple[%r]' % obj[1])
  75. if type(obj[2]) is not dict:
  76. raise RuntimeError('任务的第三元素应该为dict[%r]' % obj[2])
  77. task_queue.put(obj)
  78. except:
  79. traceback.format_exc()
  80. for i in range(n):
  81. task_queue.put('STOP') # 发送给子进程结束信号
  82. def worker(idx, task_queue, done_queue):
  83. try:
  84. for func, args, kwargs in iter(task_queue.get, 'STOP'):
  85. result = func(idx, *args, **kwargs)
  86. done_queue.put((idx, result))
  87. done_queue.put('STOP')
  88. except:
  89. ex = traceback.format_exc()
  90. done_queue.put(('EXCEPT', ex))
  91. finally:
  92. import sys
  93. sys.exit(0)
  94. # for test
  95. # 由于sum函数需要在子进程中执行,因此,必须将其配置为模块级的
  96. def calc(idx, a, b):
  97. r = 0
  98. if a == 5:
  99. raise RuntimeError('haha')
  100. for i in range(10):
  101. time.sleep(0.1)
  102. r += i
  103. return a