天道酬勤,学无止境

分析 python 多处理池(Profiling a python multiprocessing pool)

问题

我正在尝试在多处理池中的每个进程上运行 cProfile.runctx(),以了解我的源中的多处理瓶颈是什么。 这是我正在尝试做的一个简化示例:

from multiprocessing import Pool
import cProfile

def square(i):
    return i*i

def square_wrapper(i):
    cProfile.runctx("result = square(i)",
        globals(), locals(), "file_"+str(i))
    # NameError happens here - 'result' is not defined.
    return result

if __name__ == "__main__":
    pool = Pool(8)
    results = pool.map_async(square_wrapper, range(15)).get(99999)
    print results

不幸的是,尝试在探查器中执行“result = square(i)”不会影响调用它的范围内的“result”。 我怎样才能在这里完成我想要做的事情?

回答1

试试这个:

def square_wrapper(i):
    result = [None]
    cProfile.runctx("result[0] = square(i)", globals(), locals(), "file_%d" % i)
    return result[0]

受限制的 HTML

  • 允许的HTML标签:<a href hreflang> <em> <strong> <cite> <blockquote cite> <code> <ul type> <ol start type> <li> <dl> <dt> <dd> <h2 id> <h3 id> <h4 id> <h5 id> <h6 id>
  • 自动断行和分段。
  • 网页和电子邮件地址自动转换为链接。

相关推荐
  • Python多处理和共享计数器(Python multiprocessing and a shared counter)
    问题 我在多处理模块上遇到了麻烦。 我正在使用具有其map方法的工作人员池从大量文件中加载数据,并且对于每个文件,我都使用自定义函数来分析数据。 每次处理文件时,我都希望更新一个计数器,以便跟踪要处理的文件数量。 这是示例代码: def analyze_data( args ): # do something counter += 1 print counter if __name__ == '__main__': list_of_files = os.listdir(some_directory) global counter counter = 0 p = Pool() p.map(analyze_data, list_of_files) 我找不到解决方案。 回答1 问题在于counter变量没有在您的进程之间共享:每个单独的进程都在创建它自己的本地实例并对其进行递增。 有关可用于在进程之间共享状态的某些技术,请参阅文档的本节。 在您的情况下,您可能希望在工作人员之间共享一个Value实例 这是示例的工作版本(带有一些虚拟输入数据)。 请注意,它使用的是全局值,我实际上会在实践中尽量避免使用这些值: from multiprocessing import Pool, Value from time import sleep counter = None def init
  • 多处理池是否为每个进程分配了相同数量的任务,或者是否将它们分配为可用的?(Do multiprocessing pools give every process the same number of tasks, or are they assigned as available?)
    问题 当您将可迭代对象map到multiprocessing.Pool处理过程时,池在开始时是针对池中的每个进程划分为一个队列的迭代,还是有一个公共的队列供进程释放时从中提取任务? def generate_stuff(): for foo in range(100): yield foo def process(moo): print moo pool = multiprocessing.Pool() pool.map(func=process, iterable=generate_stuff()) pool.close() 因此,鉴于此未经测试的建议代码; 如果池中有4个进程,那么每个进程是否分配了25项要做的事情,或者由寻找一件事情的进程一个接一个地挑出100项工作,因此每个进程可能会处理不同数量的事情,例如30个,26、24、20。 回答1 因此,鉴于此未经测试的建议代码; 如果池中有4个进程,那么每个进程是否分配了25项要做的事情,或者由寻找一件事情的进程一个接一个地挑出100项工作,因此每个进程可能会处理不同数量的事情,例如30个,26、24、20。 好吧,最明显的答案就是要对其进行测试。 照原样,测试可能不会告诉您太多信息,因为作业将尽快完成,并且即使池化的流程在准备就绪时抢占了作业,事情也可能最终平均分配。 但是有一种简单的方法可以解决此问题: import
  • 线程池类似于多处理池?(Threading pool similar to the multiprocessing Pool?)
    问题 是否有用于工作线程的Pool类,类似于多处理模块的Pool类? 例如,我喜欢一种并行化地图功能的简单方法 def long_running_func(p): c_func_no_gil(p) p = multiprocessing.Pool(4) xs = p.map(long_running_func, range(100)) 但是,我希望这样做而不会产生新流程的开销。 我知道GIL。 但是,在我的用例中,该函数将是IO绑定的C函数,python包装器将在实际函数调用之前为其释放GIL。 我必须编写自己的线程池吗? 回答1 我刚刚发现,在multiprocessing模块中实际上有一个基于线程的Pool接口,但是它有些隐藏并且没有正确记录。 可以通过导入 from multiprocessing.pool import ThreadPool 它是使用包装Python线程的虚拟Process类实现的。 可以在multiprocessing.dummy中找到该基于线程的Process类,该类在文档中进行了简要介绍。 该虚拟模块应该提供基于线程的整个多处理接口。 回答2 在Python 3中,您可以使用current.futures.ThreadPoolExecutor,即: executor = ThreadPoolExecutor(max_workers=10) a =
  • Python多进程分析(Python multiprocess profiling)
    问题 我正在努力弄清楚如何分析一个简单的多进程python脚本 import multiprocessing import cProfile import time def worker(num): time.sleep(3) print 'Worker:', num if __name__ == '__main__': for i in range(5): p = multiprocessing.Process(target=worker, args=(i,)) cProfile.run('p.start()', 'prof%d.prof' %i) 我正在启动5个进程,因此cProfile会生成5个不同的文件。 在每个方法的内部,我想看到我的方法'worker'大约需要3秒钟才能运行,但是我只看到了'start'方法中正在发生的事情。 如果有人可以向我解释这一点,我将不胜感激。 更新:基于公认答案的工作示例: import multiprocessing import cProfile import time def test(num): time.sleep(3) print 'Worker:', num def worker(num): cProfile.runctx('test(num)', globals(), locals(), 'prof%d.prof' %num)
  • 键盘中断与python的多处理池(Keyboard Interrupts with python's multiprocessing Pool)
    问题 如何使用python的多处理池处理KeyboardInterrupt事件? 这是一个简单的示例: from multiprocessing import Pool from time import sleep from sys import exit def slowly_square(i): sleep(1) return i*i def go(): pool = Pool(8) try: results = pool.map(slowly_square, range(40)) except KeyboardInterrupt: # **** THIS PART NEVER EXECUTES. **** pool.terminate() print "You cancelled the program!" sys.exit(1) print "\nFinally, here are the results: ", results if __name__ == "__main__": go() 当运行上面的代码时,按^C会引发KeyboardInterrupt ,但是该过程只是在此时挂起,因此我必须在外部将其杀死。 我希望能够随时按^C并导致所有进程正常退出。 回答1 这是一个Python错误。 等待threading.Condition.wait()中的条件时
  • Python:使用多处理池时,写入具有队列的单个文件(Python: Writing to a single file with queue while using multiprocessing Pool)
    问题 我有成千上万的文本文件,希望通过各种方式进行解析。 我想将输出保存到单个文件中,而不会出现同步问题。 我一直在使用多处理池来节省时间,但是我不知道如何结合使用Pool和Queue。 以下代码将保存infile名称以及文件中连续的“ x”的最大数量。 但是,我希望所有进程都将结果保存到同一文件中,而不是像示例中那样保存到不同文件中。 任何帮助,将不胜感激。 import multiprocessing with open('infilenamess.txt') as f: filenames = f.read().splitlines() def mp_worker(filename): with open(filename, 'r') as f: text=f.read() m=re.findall("x+", text) count=len(max(m, key=len)) outfile=open(filename+'_results.txt', 'a') outfile.write(str(filename)+'|'+str(count)+'\n') outfile.close() def mp_handler(): p = multiprocessing.Pool(32) p.map(mp_worker, filenames) if __name__ == '_
  • 为python多处理池中的worker获取唯一的ID(Get a unique ID for worker in python multiprocessing pool)
    问题 有没有一种方法可以为python多处理池中的每个工作程序分配一个唯一的ID,从而使池中特定工作程序正在运行的作业可以知道哪个工作程序正在运行它? 根据文档,一个Process有一个name但是 名称是仅用于标识目的的字符串。 它没有语义。 多个进程可以使用相同的名称。 对于我的特定用例,我想在一组四个GPU上运行一堆作业,并且需要为运行该作业的GPU设置设备编号。 由于作业的长度不一致,因此我想确保在上一个作业完成之前,尝试在该作业上运行的作业在GPU上没有冲突(因此这避免了将ID预先分配给作业工作单元)。 回答1 似乎您想要的很简单: multiprocessing.current_process() 。 例如: import multiprocessing def f(x): print multiprocessing.current_process() return x * x p = multiprocessing.Pool() print p.map(f, range(6)) 输出: $ python foo.py <Process(PoolWorker-1, started daemon)> <Process(PoolWorker-2, started daemon)> <Process(PoolWorker-3, started daemon)>
  • Python:线程、进程与协程(7)——线程池
    前面转载了一篇分析进程池源码的博文,是一篇分析进程池很全面的文章,点击此处可以阅读。在Python中还有一个线程池的概念,它也有并发处理能力,在一定程度上能提高系统运行效率;不正之处欢迎批评指正。 线程的生命周期可以分为5个状态:创建、就绪、运行、阻塞和终止。自线程创建到终止,线程便不断在运行、创建和销毁这3个状态。一个线程的运行时间可由此可以分为3部分:线程的启动时间、线程体的运行时间和线程的销毁时间。在多线程处理的情景中,如果线程不能被重用,就意味着每次创建都需要经过启动、销毁和运行3个过程。这必然会增加系统相应的时间,降低了效率。看看之前介绍线程的博文的例子中(点击此处可以阅读),有多少个任务,就创建多少个线程,但是由于Python特有的GIL限制,它并不是真正意义上的多线程,反而会因为频繁的切换任务等开销而降低了性能(点击此处可以了解Python的GIL)。这种情况下可以使用线程池提高运行效率。 线程池的基本原理如下图,它是通过将事先创建多个能够执行任务的线程放入池中,所需要执行的任务通常要被安排在队列任务中。一般情况下,需要处理的任务比线程数目要多,线程执行完当前任务后,会从队列中取下一个任务,知道所有的任务完成。 由于线程预先被创建并放入线程池中,同时处理完当前任务之后并不销毁而是被安排处理下一个任务,因此能够避免多次创建线程,从而节省线程创建和销毁的开”
  • Python中带有工作池的异步多处理:超时后如何继续进行?(Asynchronous multiprocessing with a worker pool in Python: how to keep going after timeout?)
    问题 我想使用一个进程池来运行多个作业,并应用给定的超时时间,然后将其杀死,并替换为另一个处理下一个任务的作业。 我尝试使用multiprocessing模块,该模块提供了一种异步运行工作者池的方法(例如,使用map_async ),但是我只能设置“全局”超时,之后所有进程将被map_async 。 是否有可能有一个单独的超时,在此之后,只有一个花费太长时间的进程被杀死,然后又向池中添加了一个新的工作线程(处理下一个任务并跳过超时的任务)? 这是一个简单的例子来说明我的问题: def Check(n): import time if n % 2 == 0: # select some (arbitrary) subset of processes print "%d timeout" % n while 1: # loop forever to simulate some process getting stuck pass print "%d done" % n return 0 from multiprocessing import Pool pool = Pool(processes=4) result = pool.map_async(Check, range(10)) print result.get(timeout=1) 超时后,所有工作人员均被杀死,程序退出。 相反
  • 如何获得 Python 多处理池要完成的“工作”量?(How to get the amount of “work” left to be done by a Python multiprocessing Pool?)
    问题 到目前为止,每当我需要使用多处理时,我都是通过手动创建“进程池”并与所有子进程共享工作队列来实现的。 例如: from multiprocessing import Process, Queue class MyClass: def __init__(self, num_processes): self._log = logging.getLogger() self.process_list = [] self.work_queue = Queue() for i in range(num_processes): p_name = 'CPU_%02d' % (i+1) self._log.info('Initializing process %s', p_name) p = Process(target = do_stuff, args = (self.work_queue, 'arg1'), name = p_name) 通过这种方式,我可以向队列中添加内容,这些内容将被子进程使用。 然后我可以通过检查Queue.qsize()监控处理的Queue.qsize() : while True: qsize = self.work_queue.qsize() if qsize == 0: self._log.info('Processing finished') break
  • Python多处理池,加入; 等不及要继续?(Python multiprocessing pool, join; not waiting to go on?)
    问题 (1)我正在尝试使用pool.map然后再使用pool.map pool.join() ,但是python似乎并没有等待pool.map完成之后再经过pool.join() 。 这是我尝试过的简单示例: from multiprocessing import Pool foo = {1: []} def f(x): foo[1].append(x) print foo def main(): pool = Pool() pool.map(f, range(100)) pool.close() pool.join() print foo if __name__ == '__main__': main() 打印的输出仅为{1: []} ,就好像python只是忽略了join命令并在有机会运行f之前运行print foo一样。 预期的结果是foo为{1:[0,1,...,99]} ,并使用普通的内置python map给出此结果。 为什么合并版本打印{1: []} ,如何更改代码以打印预期结果? (2)理想我也喜欢定义foo中的局部变量main()并将它传递给f ,但通过使这样做foo的第一个参数f和使用 pool.map(functools.partial(f, foo), range(100)) 产生相同的输出。 (并且可能还有一个问题,每个进程现在都有自己的foo ?副本)
  • 如何在python多处理池中使用关键字参数apply_async(how do I use key word arguments with python multiprocessing pool apply_async)
    问题 我正在尝试使用pythons的multiprocessing模块,特别是Pool的apply_async方法。 我正在尝试使用自变量和关键字自变量调用函数。 如果我不使用kwargs调用该函数就可以了,但是当我尝试添加关键字参数时,我得到了: TypeError: apply_async() got an unexpected keyword argument 'arg2'下面是我正在运行的测试代码 #!/usr/bin/env python import multiprocessing from time import sleep def test(arg1, arg2=1, arg3=2): sleep(5) if __name__ == '__main__': pool = multiprocessing.Pool() for t in range(1000): pool.apply_async(test, t, arg2=5) pool.close() pool.join() 如何调用该函数以使其接受关键​​字参数? 回答1 在字典中传递关键字args(在元组中传递位置参数): pool.apply_async(test, (t,), dict(arg2=5)) 回答2 Janne的答案在python 2.7.11中对我不起作用(不确定原因)。 函数test(
  • 多重处理池使Numpy矩阵乘法变慢(Multiprocessing.Pool makes Numpy matrix multiplication slower)
    问题 所以,我在玩multiprocessing.Pool和Numpy ,但是似乎我错过了一些重要的观点。 为什么pool版本慢得多? 我看着htop ,可以看到创建了多个进程,但是它们都共享一个CPU,总计约100%。 $ cat test_multi.py import numpy as np from timeit import timeit from multiprocessing import Pool def mmul(matrix): for i in range(100): matrix = matrix * matrix return matrix if __name__ == '__main__': matrices = [] for i in range(4): matrices.append(np.random.random_integers(100, size=(1000, 1000))) pool = Pool(8) print timeit(lambda: map(mmul, matrices), number=20) print timeit(lambda: pool.map(mmul, matrices), number=20) $ python test_multi.py 16.0265390873 19.097837925 [更新]
  • python多处理池终止(python multiprocessing pool terminate)
    问题 我正在开发renderfarm,我需要我的客户端能够启动渲染器的多个实例,而不会阻塞,以便客户端可以接收新命令。 我的工作正常,但是在终止创建的进程时遇到了麻烦。 在全局级别,我定义了我的池(以便可以从任何函数访问它): p = Pool(2) 然后,我使用apply_async调用渲染器: for i in range(totalInstances): p.apply_async(render, (allRenderArgs[i],args[2]), callback=renderFinished) p.close() 该功能完成,在后台启动进程,并等待新命令。 我做了一个简单的命令,它将杀死客户端并停止渲染: def close(): ''' close this client instance ''' tn.write ("say "+USER+" is leaving the farm\r\n") try: p.terminate() except Exception,e: print str(e) sys.exit() 它似乎没有给出错误(它将显示错误),python终止了,但是后台进程仍在运行。 谁能推荐一种更好的方法来控制这些已启动的程序? 回答1 我找到了解决方案:在单独的线程中停止池,如下所示: def close_pool(): global pool
  • python爬取b站up主粉丝信息_使用Python爬取B站数据
    简介 最近突发奇想,想获取一下B站上的用户数据做个分析啥的。这个东西已经有很多人做过了,所以网上的成功案例也比较多。但是不少的信息已经不适合现在使用了。比如一些使用api.bilibili.com的接口。这里记录一下趟坑的经历。 流程 由于我最初的目标只是通过用户名获得一个用户的个人空间地址。分析了一下url之后发现,所有用户都会通过一个自增的mid进行区分。 最初看的几篇教程中,获得用户的数据不仅可以通过mid这个参数,还可以通过用户名,也就是user字段来进行查询。即http://api.bilibili.cn/userinfo这个接口。但是发现这个接口已经不能用了。发送消息返回404状态。 那么就只有根据浏览器行为分析找找接口了。检查发现了一个这样的接口https://space.bilibili.com/ajax/member/GetInfo参数是mid字段。但是无法通过用户名获取mid。 接下来的想法是找找还有什么地方可以获取这个人的mid,比如试图抓取关注的up注的关注者,发现只能抓取前五页。或者是使用百度之类的搜索引擎的高级搜索,看看能不能搜索到相关的信息。 以上尝试都失败的情况下,尝试获取bilibili全部的用户数据。因为mid是一个从1开始自增的数据,因此可以用这种方法来遍历这些用户的信息,来找到相关的数据。 bilibili有接近一亿用户。直接单线程跑数据
  • python多处理池重试(python multiprocessing pool retries)
    问题 如果原始计算失败,是否可以使用简单的池重新发送一条数据进行处理? import random from multiprocessing import Pool def f(x): if random.getrandbits(1): raise ValueError("Retry this computation") return x*x p = Pool(5) # If one of these f(x) calls fails, retry it with another (or same) process p.map(f, [1,2,3]) 回答1 如果您可以(或不介意)立即重试,请使用包装函数的装饰器: import random from multiprocessing import Pool from functools import wraps def retry(f): @wraps(f) def wrapped(*args, **kwargs): while True: try: return f(*args, **kwargs) except ValueError: pass return wrapped @retry def f(x): if random.getrandbits(1): raise ValueError("Retry this
  • 在终端和Django或Flask的代码模块中使用python multiprocessing pool(Using python multiprocessing Pool in the terminal and in code modules for Django or Flask)
    问题 当在Python中使用以下代码在使用multiprocessing.Pool时,会有一些奇怪的行为。 from multiprocessing import Pool p = Pool(3) def f(x): return x threads = [p.apply_async(f, [i]) for i in range(20)] for t in threads: try: print(t.get(timeout=1)) except Exception: pass 我收到以下错误三遍(池中的每个线程一个),并且打印出“ 3”到“ 19”: AttributeError: 'module' object has no attribute 'f' 前三个apply_async调用永不返回。 同时,如果我尝试: from multiprocessing import Pool p = Pool(3) def f(x): print(x) p.map(f, range(20)) 我得到AttributeError 3次,外壳打印“ 6”到“ 19”,然后挂起,无法被[Ctrl] + [C]杀死 多处理文档具有以下说法: 此软件包中的功能要求子模块可以导入主模块。 这是什么意思? 为了澄清起见,我正在终端中运行代码以测试功能,但最终我希望能够将其放入Web服务器的模块中。
  • Python进程池非守护进程?(Python Process Pool non-daemonic?)
    问题 是否可以创建非守护进程的python池? 我希望一个池能够调用内部有另一个池的函数。 我想要这个,因为守护进程无法创建进程。 具体来说,它将导致错误: AssertionError: daemonic processes are not allowed to have children 例如,考虑以下情形: function_a具有运行function_b的池,该池具有运行function_c的池。 该函数链将失败,因为在守护进程中运行了function_b ,并且守护进程无法创建进程。 回答1 multiprocessing.pool.Pool类在其__init__方法中创建工作进程,使其成为守护进程并启动它们,并且无法在启动之前将其daemon属性重新设置为False (此后不再允许)。 但是您可以创建自己的multiprocesing.pool.Pool子类( multiprocessing.Pool只是一个包装函数),并替换您自己的multiprocessing.Process子类,该子类始终是非守护进程的,用于工作进程。 这是如何执行此操作的完整示例。 重要的部分是顶部的两个类NoDaemonProcess和MyPool ,最后在您的MyPool实例上调用pool.close()和pool.join() 。 #!/usr/bin/env python # -*-
  • 利用“写时复制”功能将数据复制到Multiprocessing.Pool()工作进程(Leveraging “Copy-on-Write” to Copy Data to Multiprocessing.Pool() Worker Processes)
    问题 我有一些multiprocessing Python代码,看起来像这样: import time from multiprocessing import Pool import numpy as np class MyClass(object): def __init__(self): self.myAttribute = np.zeros(100000000) # basically a big memory struct def my_multithreaded_analysis(self): arg_lists = [(self, i) for i in range(10)] pool = Pool(processes=10) result = pool.map(call_method, arg_lists) print result def analyze(self, i): time.sleep(10) return i ** 2 def call_method(args): my_instance, i = args return my_instance.analyze(i) if __name__ == '__main__': my_instance = MyClass() my_instance.my_multithreaded_analysis()
  • 内存使用率随着Python的multiprocessing.pool的增长而增长(Memory usage keep growing with Python's multiprocessing.pool)
    问题 这是程序: #!/usr/bin/python import multiprocessing def dummy_func(r): pass def worker(): pass if __name__ == '__main__': pool = multiprocessing.Pool(processes=16) for index in range(0,100000): pool.apply_async(worker, callback=dummy_func) # clean up pool.close() pool.join() 我发现内存使用量(VIRT和RES)一直保持增长,直到close()/ join(),有什么解决方案可以摆脱这种情况? 我尝试使用2.7的maxtasksperchild,但也无济于事。 我有一个更复杂的程序,调用apply_async()〜6M次,并且在〜1.5M时,我已经拥有6G + RES,为避免所有其他因素,我将该程序简化为上述版本。 编辑: 事实证明,此版本效果更好,感谢大家的投入: #!/usr/bin/python import multiprocessing ready_list = [] def dummy_func(index): global ready_list ready_list.append(index) def