天道酬勤,学无止境

Python multiprocessing Pool map and imap

I have a multiprocessing script with pool.map that works. The problem is that not all processes take as long to finish, so some processes fall asleep because they wait until all processes are finished (same problem as in this question). Some files are finished in less than a second, others take minutes (or hours).

If I understand the manual (and this post) correctly, pool.imap is not waiting for all the processes to finish, if one is done, it is providing a new file to process. When I try that, the script is speeding over the files to process, the small ones are processed as expected, the large files (that take more time to process) don't finish until the end (are killed without notice ?). Is this normal behavior for pool.imap, or do I need to add more commands/parameters ? When I add the time.sleep(100) in the else part as test, it is processing more large files but the other processes fall asleep. Any suggestions ? Thanks

def process_file(infile):
    #read infile
    #compare things in infile
    #acquire Lock, save things in outfile, release Lock
    #delete infile

def main():
    #nprocesses = 8
    global filename
    pathlist = ['tmp0', 'tmp1', 'tmp2', 'tmp3', 'tmp4', 'tmp5', 'tmp6', 'tmp7', 'tmp8', 'tmp9']
    for d in pathlist:
        os.chdir(d)      
        todolist = []
        for infile in os.listdir():  
            todolist.append(infile)
        try:   
            p = Pool(processes=nprocesses)
            p.imap(process_file, todolist)
        except KeyboardInterrupt:                
            print("Shutting processes down")
           # Optionally try to gracefully shut down the worker processes here.       
            p.close()
            p.terminate()
            p.join()
        except StopIteration:
            continue     
        else:
            time.sleep(100)
            os.chdir('..')
        p.close()
        p.join() 

if __name__ == '__main__':
    main()    

评论

Since you already put all your files in a list, you could put them directly into a queue. The queue is then shared with your sub-processes that take the file names from the queue and do their stuff. No need to do it twice (first into list, then pickle list by Pool.imap). Pool.imap is doing exactly the same but without you knowing it.

todolist = []
for infile in os.listdir():  
    todolist.append(infile)

can be replaced by:

todolist = Queue()
for infile in os.listdir():  
    todolist.put(infile)

The complete solution would then look like:

def process_file(inqueue):
    for infile in iter(inqueue.get, "STOP"):
        #do stuff until inqueue.get returns "STOP"
    #read infile
    #compare things in infile
    #acquire Lock, save things in outfile, release Lock
    #delete infile

def main():
    nprocesses = 8
    global filename
    pathlist = ['tmp0', 'tmp1', 'tmp2', 'tmp3', 'tmp4', 'tmp5', 'tmp6', 'tmp7', 'tmp8', 'tmp9']
    for d in pathlist:
        os.chdir(d)      
        todolist = Queue()
        for infile in os.listdir():  
            todolist.put(infile)
        process = [Process(target=process_file,
                      args=(todolist) for x in range(nprocesses)]
        for p in process:
            #task the processes to stop when all files are handled
            #"STOP" is at the very end of queue
            todolist.put("STOP")
        for p in process:
            p.start()
        for p in process:
            p.join()    
if __name__ == '__main__':
    main()

受限制的 HTML

  • 允许的HTML标签:<a href hreflang> <em> <strong> <cite> <blockquote cite> <code> <ul type> <ol start type> <li> <dl> <dt> <dd> <h2 id> <h3 id> <h4 id> <h5 id> <h6 id>
  • 自动断行和分段。
  • 网页和电子邮件地址自动转换为链接。

相关推荐
  • multiprocessing.Pool:map_async和imap有什么区别?(multiprocessing.Pool: What's the difference between map_async and imap?)
    问题 我正在尝试学习如何使用Python的multiprocessing程序包,但我不了解map_async和imap之间的区别。 我注意到map_async和imap都是异步执行的。 那么我什么时候应该在另一个上使用呢? 我应该如何检索map_async返回的结果? 我应该使用这样的东西吗? def test(): result = pool.map_async() pool.close() pool.join() return result.get() result=test() for i in result: print i 回答1 imap / imap_unordered和map / map_async之间有两个主要区别: 他们消耗可迭代的方式将传递给他们。 他们将结果返回给您的方式。 map通过将iterable转换为列表(假设它还不是列表)来消耗您的iterable,将其分成多个块,然后将这些块发送给Pool的工作进程。 与将可迭代项中的每个项一次在一个进程之间传递相比,将可迭代项拆分为多个块的效果更好-特别是在可迭代项较大的情况下。 但是,将迭代器转换为列表以对其进行分块可能会具有很高的内存成本,因为整个列表都需要保留在内存中。 imap不会将您提供的可迭代项转换为列表,也不会将其拆分为多个块(默认情况下)。 它将一次遍历可迭代的一个元素
  • 多重处理和莳萝可以一起做什么?(What can multiprocessing and dill do together?)
    问题 我想在Python中使用multiprocessing库。 可悲的是, multiprocessing使用的pickle不支持带有闭包,lambda或__main__函数的函数。 所有这三个对我都很重要 In [1]: import pickle In [2]: pickle.dumps(lambda x: x) PicklingError: Can't pickle <function <lambda> at 0x23c0e60>: it's not found as __main__.<lambda> 幸运的是,莳萝的腌制能力更强。 显然dill在导入时表现出魔力,使泡菜发挥作用 In [3]: import dill In [4]: pickle.dumps(lambda x: x) Out[4]: "cdill.dill\n_load_type\np0\n(S'FunctionType'\np1 ... 这非常令人鼓舞,特别是因为我无权访问多处理源代码。 可悲的是,我仍然无法使这个非常基本的例子起作用 import multiprocessing as mp import dill p = mp.Pool(4) print p.map(lambda x: x**2, range(10)) 为什么是这样? 我想念什么? 到底是multiprocessing +
  • Python Multiprocessing.Pool lazy iteration
    I'm wondering about the way that python's Multiprocessing.Pool class works with map, imap, and map_async. My particular problem is that I want to map on an iterator that creates memory-heavy objects, and don't want all these objects to be generated into memory at the same time. I wanted to see if the various map() functions would wring my iterator dry, or intelligently call the next() function only as child processes slowly advanced, so I hacked up some tests as such: def g(): for el in xrange(100): print el yield el def f(x): time.sleep(1) return x*x if __name__ == '__main__': pool = Pool
  • Python 3中的Concurrent.futures与Multiprocessing(Concurrent.futures vs Multiprocessing in Python 3)
    问题 Python 3.2引入了Concurrent Futures,这似乎是较旧的线程和多处理模块的一些高级组合。 与较旧的多处理模块相比,将此功能用于与CPU绑定的任务有什么优点和缺点? 本文建议他们更容易使用-是这样吗? 回答1 我不会称其为“更高级”的concurrent.futures -它是一个更简单的接口,其工作原理几乎相同,无论您使用多个线程还是多个进程作为基础并行化ization头。 所以,像“简单的界面”的几乎所有情况下,大同小异的取舍都参与:它有一个浅的学习曲线,这在很大程度上只是因为有可用少了这么多需要学习; 但是,由于它提供的选项较少,因此最终可能使您无法使用更丰富的界面而感到沮丧。 就与CPU绑定的任务而言,这还不够具体,以至于说不出什么有意义的事情。 对于CPython下与CPU绑定的任务,您需要多个进程而不是多个线程才能获得加速的机会。 但是,获得多少加速(如果有的话)取决于硬件,操作系统的详细信息,尤其取决于特定任务需要多少进程间通信。 在幕后,所有进程间并行化头都依赖于相同的OS原语-用于获得这些原语的高级API并不是底线速度的主要因素。 编辑:示例 这是您引用的文章中显示的最终代码,但是我添加了使它起作用所需的import语句: from concurrent.futures import ProcessPoolExecutor def
  • Show the progress of a Python multiprocessing pool imap_unordered call?
    I have a script that's successfully doing a multiprocessing Pool set of tasks with a imap_unordered() call: p = multiprocessing.Pool() rs = p.imap_unordered(do_work, xrange(num_tasks)) p.close() # No more work p.join() # Wait for completion However, my num_tasks is around 250,000, and so the join() locks the main thread for 10 seconds or so, and I'd like to be able to echo out to the command line incrementally to show the main process isn't locked. Something like: p = multiprocessing.Pool() rs = p.imap_unordered(do_work, xrange(num_tasks)) p.close() # No more work while (True): remaining = rs
  • 多重处理:使用tqdm显示进度条(Multiprocessing : use tqdm to display a progress bar)
    问题 为了使我的代码更加“ Pythonic”和更快,我使用“ multiprocessing”和一个map函数向其发送a)函数和b)迭代范围。 植入的解决方案(即直接在范围tqdm.tqdm(range(0,30))上调用tqdm不适用于多重处理(如以下代码中所述)。 进度条显示为0到100%(当python读取代码时?),但是它并不表示map函数的实际进度。 如何显示进度条以指示“地图”功能在哪一步? from multiprocessing import Pool import tqdm import time def _foo(my_number): square = my_number * my_number time.sleep(1) return square if __name__ == '__main__': p = Pool(2) r = p.map(_foo, tqdm.tqdm(range(0, 30))) p.close() p.join() 欢迎任何帮助或建议... 回答1 使用imap代替map,它返回已处理值的迭代器。 from multiprocessing import Pool import tqdm import time def _foo(my_number): square = my_number * my_number time
  • multiprocessing.Pool: What's the difference between map_async and imap?
    I'm trying to learn how to use Python's multiprocessing package, but I don't understand the difference between map_async and imap. I noticed that both map_async and imap are executed asynchronously. So when should I use one over the other? And how should I retrieve the result returned by map_async? Should I use something like this? def test(): result = pool.map_async() pool.close() pool.join() return result.get() result=test() for i in result: print i
  • 多处理池是否为每个进程分配了相同数量的任务,或者是否将它们分配为可用的?(Do multiprocessing pools give every process the same number of tasks, or are they assigned as available?)
    问题 当您将可迭代对象map到multiprocessing.Pool处理过程时,池在开始时是针对池中的每个进程划分为一个队列的迭代,还是有一个公共的队列供进程释放时从中提取任务? def generate_stuff(): for foo in range(100): yield foo def process(moo): print moo pool = multiprocessing.Pool() pool.map(func=process, iterable=generate_stuff()) pool.close() 因此,鉴于此未经测试的建议代码; 如果池中有4个进程,那么每个进程是否分配了25项要做的事情,或者由寻找一件事情的进程一个接一个地挑出100项工作,因此每个进程可能会处理不同数量的事情,例如30个,26、24、20。 回答1 因此,鉴于此未经测试的建议代码; 如果池中有4个进程,那么每个进程是否分配了25项要做的事情,或者由寻找一件事情的进程一个接一个地挑出100项工作,因此每个进程可能会处理不同数量的事情,例如30个,26、24、20。 好吧,最明显的答案就是要对其进行测试。 照原样,测试可能不会告诉您太多信息,因为作业将尽快完成,并且即使池化的流程在准备就绪时抢占了作业,事情也可能最终平均分配。 但是有一种简单的方法可以解决此问题: import
  • 多重处理:如何在类中定义的函数上使用Pool.map?(Multiprocessing: How to use Pool.map on a function defined in a class?)
    问题 当我运行类似的东西时: from multiprocessing import Pool p = Pool(5) def f(x): return x*x p.map(f, [1,2,3]) 它工作正常。 但是,将其作为类的函数: class calculate(object): def run(self): def f(x): return x*x p = Pool() return p.map(f, [1,2,3]) cl = calculate() print cl.run() 给我以下错误: Exception in thread Thread-1: Traceback (most recent call last): File "/sw/lib/python2.6/threading.py", line 532, in __bootstrap_inner self.run() File "/sw/lib/python2.6/threading.py", line 484, in run self.__target(*self.__args, **self.__kwargs) File "/sw/lib/python2.6/multiprocessing/pool.py", line 225, in _handle_tasks put(task)
  • Python:使用多处理池时,写入具有队列的单个文件(Python: Writing to a single file with queue while using multiprocessing Pool)
    问题 我有成千上万的文本文件,希望通过各种方式进行解析。 我想将输出保存到单个文件中,而不会出现同步问题。 我一直在使用多处理池来节省时间,但是我不知道如何结合使用Pool和Queue。 以下代码将保存infile名称以及文件中连续的“ x”的最大数量。 但是,我希望所有进程都将结果保存到同一文件中,而不是像示例中那样保存到不同文件中。 任何帮助,将不胜感激。 import multiprocessing with open('infilenamess.txt') as f: filenames = f.read().splitlines() def mp_worker(filename): with open(filename, 'r') as f: text=f.read() m=re.findall("x+", text) count=len(max(m, key=len)) outfile=open(filename+'_results.txt', 'a') outfile.write(str(filename)+'|'+str(count)+'\n') outfile.close() def mp_handler(): p = multiprocessing.Pool(32) p.map(mp_worker, filenames) if __name__ == '_
  • Can I use a multiprocessing Queue in a function called by Pool.imap?
    I'm using python 2.7, and trying to run some CPU heavy tasks in their own processes. I would like to be able to send messages back to the parent process to keep it informed of the current status of the process. The multiprocessing Queue seems perfect for this but I can't figure out how to get it work. So, this is my basic working example minus the use of a Queue. import multiprocessing as mp import time def f(x): return x*x def main(): pool = mp.Pool() results = pool.imap_unordered(f, range(1, 6)) time.sleep(1) print str(results.next()) pool.close() pool.join() if __name__ == '__main__': main(
  • 为什么我可以将实例方法传递给multiprocessing.Process,但不能传递给multiprocessing.Pool?(Why can I pass an instance method to multiprocessing.Process, but not a multiprocessing.Pool?)
    问题 我正在尝试编写一个与multiprocessing.Pool同时应用功能的应用程序。 我希望这个函数是一个实例方法(所以我可以在不同的子类中不同地定义它)。 这似乎是不可能的。 正如我在其他地方了解到的那样,显然不能对绑定的方法进行腌制。 那么,为什么以绑定方法作为目标来启动multiprocessing.Process呢? 如下代码: import multiprocessing def test1(): print "Hello, world 1" def increment(x): return x + 1 class testClass(): def process(self): process1 = multiprocessing.Process(target=test1) process1.start() process1.join() process2 = multiprocessing.Process(target=self.test2) process2.start() process2.join() def pool(self): pool = multiprocessing.Pool(1) for answer in pool.imap(increment, range(10)): print answer print for answer in
  • multiprocessing.Pool:何时使用apply,apply_async或map?(multiprocessing.Pool: When to use apply, apply_async or map?)
    问题 我还没有看到关于Pool.apply,Pool.apply_async和Pool.map用例的清晰示例。 我主要使用Pool.map ; 别人的优点是什么? 回答1 回顾Python的旧时代,要使用任意参数调用函数,可以使用apply : apply(f,args,kwargs) apply于Python2.7依然存在虽然不是在Python3,一般不再使用。 如今, f(*args,**kwargs) 是首选。 multiprocessing.Pool模块尝试提供类似的接口。 Pool.apply类似于Python apply ,不同之处在于函数调用是在单独的进程中执行的。 Pool.apply阻止,直到功能完成。 Pool.apply_async也类似于Python的内置apply ,只是调用立即返回而不是等待结果。 返回一个AsyncResult对象。 您调用其get()方法以检索函数调用的结果。 get()方法将阻塞,直到函数完成。 因此, pool.apply(func, args, kwargs)等同于pool.apply_async(func, args, kwargs).get() 。 与Pool.apply , Pool.apply_async方法还具有一个回调(如果提供),该回调在函数完成时被调用。 可以使用它代替调用get() 。 例如: import
  • 是否可以在Pool.imap调用的函数中使用多处理队列?(Can I use a multiprocessing Queue in a function called by Pool.imap?)
    问题 我正在使用python 2.7,并尝试在自己的进程中运行一些CPU繁重的任务。 我希望能够将消息发送回父流程,以使其随时了解流程的当前状态。 为此,多处理队列似乎很完美,但是我不知道如何使它工作。 因此,这是我的基本工作示例,减去了Queue的使用。 import multiprocessing as mp import time def f(x): return x*x def main(): pool = mp.Pool() results = pool.imap_unordered(f, range(1, 6)) time.sleep(1) print str(results.next()) pool.close() pool.join() if __name__ == '__main__': main() 我尝试以几种方式传递队列,它们收到错误消息“ RuntimeError:队列对象仅应通过继承在进程之间共享”。 这是我根据之前发现的答案尝试的一种方法。 (尝试使用Pool.map_async和Pool.imap时遇到相同的问题) import multiprocessing as mp import time def f(args): x = args[0] q = args[1] q.put(str(x)) time.sleep(0.1) return x*x
  • Spyder中的简单Python多重处理功能不会输出结果(Simple Python Multiprocessing function in Spyder doesn't output results)
    问题 我这里有一个非常简单的函数,试图在其中运行和测试,但是它不输出任何东西,也没有任何错误。 我已经多次检查代码,但没有任何错误。 我打印了工作,这是我得到的: [<Process(Process-12, stopped[1])>, <Process(Process-13, stopped[1])>, <Process(Process-14, stopped[1])>, <Process(Process-15, stopped[1])>, <Process(Process-16, stopped[1])>] 这是代码: import multiprocessing def worker(num): print "worker ", num return jobs = [] for i in range(5): p = multiprocessing.Process(target = worker, args = (i,)) jobs.append(p) p.start() 这是我期望的结果,但未输出任何内容: Worker: 0 Worker: 1 Worker: 2 Worker: 3 Worker: 4 回答1 评论表明OP和Spyder都使用Windows。 由于Spyder重定向stdout并且Windows不支持分支,因此新的子进程将不会打印到Spyder控制台中。
  • python多处理池重试(python multiprocessing pool retries)
    问题 如果原始计算失败,是否可以使用简单的池重新发送一条数据进行处理? import random from multiprocessing import Pool def f(x): if random.getrandbits(1): raise ValueError("Retry this computation") return x*x p = Pool(5) # If one of these f(x) calls fails, retry it with another (or same) process p.map(f, [1,2,3]) 回答1 如果您可以(或不介意)立即重试,请使用包装函数的装饰器: import random from multiprocessing import Pool from functools import wraps def retry(f): @wraps(f) def wrapped(*args, **kwargs): while True: try: return f(*args, **kwargs) except ValueError: pass return wrapped @retry def f(x): if random.getrandbits(1): raise ValueError("Retry this
  • 如何使用带有多个参数的multiprocessing pool.map?(How to use multiprocessing pool.map with multiple arguments?)
    问题 在Python multiprocessing库中,是否存在pool.map的变体,它支持多个参数? text = "test" def harvester(text, case): X = case[0] text+ str(X) if __name__ == '__main__': pool = multiprocessing.Pool(processes=6) case = RAW_DATASET pool.map(harvester(text,case),case, 1) pool.close() pool.join() 回答1 答案取决于版本和情况。 JF Sebastian首先描述了最近的Python版本(从3.3开始)的最一般的答案。 1它使用Pool.starmap方法,该方法接受一系列参数元组。 然后,它会自动将每个元组的参数解包,并将其传递给给定的函数: import multiprocessing from itertools import product def merge_names(a, b): return '{} & {}'.format(a, b) if __name__ == '__main__': names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
  • numpy vs. multiprocessing and mmap
    I am using Python's multiprocessing module to process large numpy arrays in parallel. The arrays are memory-mapped using numpy.load(mmap_mode='r') in the master process. After that, multiprocessing.Pool() forks the process (I presume). Everything seems to work fine, except I am getting lines like: AttributeError("'NoneType' object has no attribute 'tell'",) in `<bound method memmap.__del__ of memmap([ 0.57735026, 0.57735026, 0.57735026, 0. , 0. , 0. , 0. , 0. , 0. , 0. , 0. , 0. ], dtype=float32)>` ignored in the unittest logs. The tests pass fine, nevertheless. Any idea what's going on there
  • 我们什么时候应该调用multiprocessing.Pool.join?(When should we call multiprocessing.Pool.join?)
    问题 我正在使用'multiprocess.Pool.imap_unordered'如下 from multiprocessing import Pool pool = Pool() for mapped_result in pool.imap_unordered(mapping_func, args_iter): do some additional processing on mapped_result 我需要在for循环之后调用pool.close或pool.join吗? 回答1 不,您没有,但是如果您不再使用游泳池,那可能是个好主意。 提姆·彼得斯在这篇SO帖子中很好地说明了调用pool.close或pool.join原因: 至于Pool.close(),您应该在永远不会将更多工作提交给Pool实例的情况下(且仅在)进行调用。 因此,通常在主程序的可并行化部分完成时调用Pool.close()。 然后,当所有已分配的工作完成时,工作进程将终止。 调用Pool.join()等待工作进程终止也是一种很好的做法。 除其他原因外,通常没有很好的方法来报告并行化代码中的异常(异常仅在与您的主程序正在做的事情有关的上下文中发生),而Pool.join()提供了一个同步点,可以报告发生的某些异常在您否则无法看到的工作流程中。 回答2
  • Python multiprocessing behavior of Pool / starmap
    I've got a program using the multiprocessing library to compute some stuff. There are about 10K inputs to compute, each of them taking between 0.2 second and 10 seconds. My current approach uses a Pool: # Inputs signals = [list(s) for s in itertools.combinations_with_replacement(possible_inputs, 3)] # Compute with mp.Pool(processes = N) as p: p.starmap(compute_solutions, [(s, t0, tf, folder) for s in signals]) print (" | Computation done.") I've noticed that on the 300 / 400 last inputs to check, the program became a lot slower. My question is: how does the Pool and the starmap() behave? Fro