天道酬勤,学无止境

python multiprocessing, manager initiates process spawn loop

I have a simple python multiprocessing script that sets up a pool of workers that attempt to append work-output to a Manager list. The script has 3 call stacks: - main calls f1 that spawns several worker processes that call another function g1. When one attempts to debug the script (incidentally on Windows 7/64 bit/VS 2010/PyTools) the script runs into a nested process creation loop, spawning an endless number of processes. Can anyone determine why? I'm sure I am missing something very simple. Here's the problematic code: -

import multiprocessing
import logging

manager = multiprocessing.Manager()
results = manager.list()

def g1(x):
    y = x*x
    print "processing: y = %s" % y
    results.append(y)

def f1():
    logger = multiprocessing.log_to_stderr()
    logger.setLevel(multiprocessing.SUBDEBUG)

    pool = multiprocessing.Pool(processes=4)
    for (i) in range(0,15):
        pool.apply_async(g1, [i])
    pool.close()
    pool.join()

def main():
    f1()

if __name__ == "__main__":
    main()

PS: tried adding multiprocessing.freeze_support() to main to no avail.

评论

Basically, what sr2222 mentions in his comment is correct. From the multiprocessing manager docs, it says that the ____main____ module must be importable by the children. Each manager " object corresponds to a spawned child process", so each child is basically re-importing your module (you can see by adding a print statement at module scope to my fixed version!)...which leads to infinite recursion.

One solution would be to move your manager code into f1():

import multiprocessing
import logging

def g1(results, x):
    y = x*x
    print "processing: y = %s" % y
    results.append(y)

def f1():
    logger = multiprocessing.log_to_stderr()
    logger.setLevel(multiprocessing.SUBDEBUG)
    manager = multiprocessing.Manager()
    results = manager.list()
    pool = multiprocessing.Pool(processes=4)
    for (i) in range(0,15):
        pool.apply_async(g1, [results, i])
    pool.close()
    pool.join()


def main():
    f1()

if __name__ == "__main__":
    main()

受限制的 HTML

  • 允许的HTML标签:<a href hreflang> <em> <strong> <cite> <blockquote cite> <code> <ul type> <ol start type> <li> <dl> <dt> <dd> <h2 id> <h3 id> <h4 id> <h5 id> <h6 id>
  • 自动断行和分段。
  • 网页和电子邮件地址自动转换为链接。

相关推荐
  • 了解多处理:Python中的共享内存管理,锁和队列(Understanding Multiprocessing: Shared Memory Management, Locks and Queues in Python)
    问题 多重处理是python中的强大工具,我想更深入地了解它。 我想知道何时使用普通锁和队列,当使用多进程管理器中的所有进程共享这些。 我想出了以下测试了四种不同情况的场景多处理: 使用游泳池和NO经理使用一个游泳池和一个经理使用单独的流程和NO经理使用单独的流程和管理 工作 所有条件执行工作职能the_job 。 the_job包括一些印刷其通过锁固定的。 此外,输入到函数简单地放入队列(以查看它是否可以从队列中回收)。 该输入是一个简单的索引idx从range(10)在主脚本调用创建start_scenario (在底部示出)。 def the_job(args): """The job for multiprocessing. Prints some stuff secured by a lock and finally puts the input into a queue. """ idx = args[0] lock = args[1] queue=args[2] lock.acquire() print 'I' print 'was ' print 'here ' print '!!!!' print '1111' print 'einhundertelfzigelf\n' who= ' By run %d \n' % idx print who lock
  • Python Multiprocessing concurrency using Manager, Pool and a shared list not working
    I am learning python multiprocessing, and I am trying to use this feature to populate a list with all the files present in an os. However, the code that I wrote is executing sequentially only. #!/usr/bin/python import os import multiprocessing tld = [os.path.join("/", f) for f in os.walk("/").next()[1]] #Gets a top level directory names inside "/" manager = multiprocessing.Manager() files = manager.list() def get_files(x): for root, dir, file in os.walk(x): for name in file: files.append(os.path.join(root, name)) mp = [multiprocessing.Process(target=get_files, args=(tld[x],)) for x in range
  • 使用multiprocessing.Manager.list而不是真实列表会使计算耗时(Using multiprocessing.Manager.list instead of a real list makes the calculation take ages)
    问题 我想从本示例开始尝试使用multiprocessing不同方式: $ cat multi_bad.py import multiprocessing as mp from time import sleep from random import randint def f(l, t): # sleep(30) return sum(x < t for x in l) if __name__ == '__main__': l = [randint(1, 1000) for _ in range(25000)] t = [randint(1, 1000) for _ in range(4)] # sleep(15) pool = mp.Pool(processes=4) result = pool.starmap_async(f, [(l, x) for x in t]) print(result.get()) 在这里, l是一个列表,当生成4个进程时,该列表将被复制4次。 为了避免这种情况,文档页面提供了使用队列,共享数组或使用multiprocessing.Manager创建的代理对象的信息。 对于最后一个,我更改了l的定义: $ diff multi_bad.py multi_good.py 10c10,11 < l = [randint(1, 1000) for _ in
  • 为python多处理池中的worker获取唯一的ID(Get a unique ID for worker in python multiprocessing pool)
    问题 有没有一种方法可以为python多处理池中的每个工作程序分配一个唯一的ID,从而使池中特定工作程序正在运行的作业可以知道哪个工作程序正在运行它? 根据文档,一个Process有一个name但是 名称是仅用于标识目的的字符串。 它没有语义。 多个进程可以使用相同的名称。 对于我的特定用例,我想在一组四个GPU上运行一堆作业,并且需要为运行该作业的GPU设置设备编号。 由于作业的长度不一致,因此我想确保在上一个作业完成之前,尝试在该作业上运行的作业在GPU上没有冲突(因此这避免了将ID预先分配给作业工作单元)。 回答1 似乎您想要的很简单: multiprocessing.current_process() 。 例如: import multiprocessing def f(x): print multiprocessing.current_process() return x * x p = multiprocessing.Pool() print p.map(f, range(6)) 输出: $ python foo.py <Process(PoolWorker-1, started daemon)> <Process(PoolWorker-2, started daemon)> <Process(PoolWorker-3, started daemon)>
  • 在类方法Python中调用多重处理(call multiprocessing in class method Python)
    问题 最初,我有一个类来存储一些处理后的值,并将其与其他方法重用。 问题是,当我尝试将类方法划分为多个进程以加快速度时,python生成了进程,但它似乎不起作用(正如我在“任务管理器”中看到的那样,只有1个进程在运行)并且结果从未传递过。 我进行了几次搜索,发现pathos.multiprocessing可以代替它,但是我想知道标准库是否可以解决这个问题? from multiprocessing import Pool class A(): def __init__(self, vl): self.vl = vl def cal(self, nb): return nb * self.vl def run(self, dt): t = Pool(processes=4) rs = t.map(self.cal, dt) t.close() return t a = A(2) a.run(list(range(10))) 回答1 您的代码失败了,因为它无法pickle实例方法( self.cal ),这是Python通过将它们映射到multiprocessing.Pool来生成多个进程时试图执行的操作。太复杂了,反正也不是很有用)-由于没有共享内存访问,因此它必须对数据进行“打包”并将其发送到生成的进程中以进行解压缩。 如果您尝试对实例进行腌制,也会发生同样a情况。
  • python_day10_多进程 / queue、pipes、manager / 进程锁、进程池 / 协程、Gevent、爬虫、socket / IO多路复用 / IO模式 / select
    python_多进程 / queue、pipes、manager / 进程锁、进程池 / 协程、Gevent、爬虫、socket / IO多路复用 / IO模式 / select 在这里得感谢,老师Alex金角大王(路飞学城IT) Python(给兄弟们挂个🔗) python边写边更… 一、多进程: (上面说了多线程操作,多线程操作适合于IO密集型的操作,不适合cpu密集型的操作) (IO密集型的操作:io操作不占用cpu,你从硬盘上读一块数据、你从网络上读一块数据或者你从内存里读一块数据) (CPU密集型的操作:计算占用cpu,例如:1+1;各种计算…) 因为python的多线程,要不断的实现“上下文的切换”;如果你的程序计算多的(cpu密集型的),不建议使用,可能还没串型的块;如果你的程序IO操作多一点,可以使用多线程… python的 “进程” 和 “线程” ,都是起的os的原生进程和线程,这些原生进程和线程都是os自己维护的;(若你的CPU是8核的,你起8个进程,这8个进程实打在这8个核上的;而且每一个进程都会有一个线程,所以“多线程”的目的又达到了;but,这些“多线程”的资源或者内存,不是共享的;而且你的进程若是很大,这样你的内存和其他资源会被拖到很累…) so:多进程,是想利用“多核”; 1.multiprocessing: import
  • Python在进程之间共享锁(Python sharing a lock between processes)
    问题 我正在尝试使用部分函数,​​以便pool.map()可以定位具有多个参数(在本例中为Lock()对象)的函数。 这是示例代码(摘自我的上一个问题的答案): from functools import partial def target(lock, iterable_item): for item in items: # Do cool stuff if (... some condition here ...): lock.acquire() # Write to stdout or logfile, etc. lock.release() def main(): iterable = [1, 2, 3, 4, 5] pool = multiprocessing.Pool() l = multiprocessing.Lock() func = partial(target, l) pool.map(func, iterable) pool.close() pool.join() 但是,当我运行此代码时,出现错误: Runtime Error: Lock objects should only be shared between processes through inheritance. 我在这里想念什么? 如何在子流程之间共享锁? 回答1
  • 如果父项在Python中被杀死,则杀死子进程(Kill Child Process if Parent is killed in Python)
    问题 我从python脚本生成了5个不同的进程,如下所示: p = multiprocessing.Process(target=some_method,args=(arg,)) p.start() 我的问题是,当父进程(主脚本)以某种方式被杀死时,子进程继续运行。 当父进程被杀死时,有没有办法杀死这样生成的子进程? 编辑:我正在尝试: p = multiprocessing.Process(target=client.start,args=(self.query_interval,)) p.start() atexit.register(p.terminate) 但这似乎不起作用 回答1 我自己也遇到了同样的问题,我有以下解决方案: 在调用p.start()之前,您可以设置p.daemon=True 。 然后就像这里提到的python.org multiprocessing 进程退出时,它将尝试终止其所有守护程序子进程。 回答2 不会通知孩子其父母去世的情况,只会以其他方式起作用。 但是,当进程终止时,其所有文件描述符都将关闭。 如果选择了要读取的管道,则管道的另一端会收到通知。 因此,您的父母可以在产生该过程之前创建一个管道(或者,实际上,您可以将stdin设置为管道),而孩子可以选择该管道进行读取。 当父端关闭时,它将报告已准备好读取。 这要求您的孩子运行一个主循环
  • Python multiprocessing never joins
    I'm using multiprocessing, and specifically a Pool to spin off a couple of 'threads' to do a bunch of slow jobs that I have. However, for some reason, I can't get the main thread to rejoin, even though all of the children appear to have died. Resolved: It appears the answer to this question is to just launch multiple Process objects, rather than using a Pool. It's not abundantly clear why, but I suspect the remaining process is a manager for the pool and it's not dying when the processes finish. If anyone else has this problem, this is the answer. Main Thread pool = Pool(processes=12
  • Python - calling multiprocessing.pool inside a daemon
    I have a Python script which spawns a daemon process. Inside the process, I am using multiprocessing.pool to run 1 to 4 processes simultaneously. When I run this outside the daemon process, it works perfectly (i.e., when I set run_from_debugger=True - see code below), but if I run the code via a daemon process, (i.e., run_from_debugger=False), async_function is never executed. Is it possible to use multiprocessing.pool inside a daemon process??? I am using Python-daemon 1.6 as my daemon package (if it matters). Code: def loop_callback(params): #Spawn the process in the pool # Because loop
  • PicklingError:不能腌制 :与十进制不同的对象(PicklingError: Can't pickle <class 'decimal.Decimal'>: it's not the same object as decimal.Decimal)
    问题 这是我今天在<a href"http://filmaster.com"> filmaster.com上遇到的错误: PicklingError: Can't pickle <class 'decimal.Decimal'>: it's not the same object as decimal.Decimal 这到底是什么意思? 似乎没有什么意义...似乎与django缓存有关。 您可以在此处查看整个回溯: Traceback (most recent call last): File "/home/filmaster/django-trunk/django/core/handlers/base.py", line 92, in get_response response = callback(request, *callback_args, **callback_kwargs) File "/home/filmaster/film20/film20/core/film_views.py", line 193, in show_film workflow.set_data_for_authenticated_user() File "/home/filmaster/film20/film20/core/film_views.py", line 518, in set
  • 如何限制多处理流程的范围?(How can I restrict the scope of a multiprocessing process?)
    问题 通过使用python的多处理模块,以下人为设计的示例以最低的内存要求运行: import multiprocessing # completely_unrelated_array = range(2**25) def foo(x): for x in xrange(2**28):pass print x**2 P = multiprocessing.Pool() for x in range(8): multiprocessing.Process(target=foo, args=(x,)).start() 取消注释completely_unrelated_array的创建,您会发现每个生成的进程都会为completely_unrelated_array的副本分配内存! 这是一个更大项目的最小示例,我不知道如何解决。 多重处理似乎可以复制全局的所有内容。 我并不需要一个共享内存对象,我只需要在通过x ,并处理它无需将整个程序的内存开销。 侧面观察:有趣的是foo中的print id(completely_unrelated_array)给出了相同的值,这表明它可能不是副本... 回答1 由于性质的os.fork()在您的全局命名空间的变量__main__模块将由子进程继承(假设你是一个POSIX平台上),所以你会看到孩子们的内存使用情况在创建它们后立即反映出来。 据我所知
  • What is the cleanest way to stop a python multiprocessing worker attached to a queue in an infinite loop?
    I'm implementing a producer-consumer pattern in python using multiprocessing.Pool and multiprocessing.Queue. Consumers are pre-forked processes that uses gevent to spawn multiple tasks. Here is a trimmed down version of code: import gevent from Queue import Empty as QueueEmpty from multiprocessing import Process, Queue, Pool import signal import time # Task queue queue = Queue() def init_worker (): # Ignore signals in worker signal.signal( signal.SIGTERM, signal.SIG_IGN ) signal.signal( signal.SIGINT, signal.SIG_IGN ) signal.signal( signal.SIGQUIT, signal.SIG_IGN ) # One of the worker task def
  • 如何在Python中进行并行编程?(How to do parallel programming in Python?)
    问题 对于C ++,我们可以使用OpenMP进行并行编程。 但是,OpenMP不适用于Python。 如果要并行处理python程序的某些部分,该怎么办? 该代码的结构可以认为是: solve1(A) solve2(B) 其中solve1和solve2是两个独立的函数。 为了减少运行时间,如何并行而不是按顺序运行这种代码? 代码是: def solve(Q, G, n): i = 0 tol = 10 ** -4 while i < 1000: inneropt, partition, x = setinner(Q, G, n) outeropt = setouter(Q, G, n) if (outeropt - inneropt) / (1 + abs(outeropt) + abs(inneropt)) < tol: break node1 = partition[0] node2 = partition[1] G = updateGraph(G, node1, node2) if i == 999: print "Maximum iteration reaches" print inneropt 其中setinner和setouter是两个独立的功能。 那是我要平行的地方... 回答1 您可以使用多处理模块。 对于这种情况,我可以使用一个处理池: from
  • 我的Python进程在哪些CPU内核上运行?(On what CPU cores are my Python processes running?)
    问题 设置 我已经在Windows PC上用Python编写了一个非常复杂的软件。 我的软件基本上启动了两个Python解释器外壳。 双击main.py文件时,第一个外壳启动(我想)。 在该外壳中,其他线程以以下方式启动: # Start TCP_thread TCP_thread = threading.Thread(name = 'TCP_loop', target = TCP_loop, args = (TCPsock,)) TCP_thread.start() # Start UDP_thread UDP_thread = threading.Thread(name = 'UDP_loop', target = UDP_loop, args = (UDPsock,)) TCP_thread.start() Main_thread启动一个TCP_thread和一个UDP_thread 。 尽管它们是单独的线程,但它们都在一个Python Shell中运行。 Main_thread还启动一个子Main_thread 。 这可以通过以下方式完成: p = subprocess.Popen(['python', mySubprocessPath], shell=True) 从Python文档中,我了解到该子进程在单独的Python解释器会话/外壳中同时运行(!) 。 此子Main
  • Multiprocessing in a pipeline done right
    I'd like to know how multiprocessing is done right. Assuming I have a list [1,2,3,4,5] generated by function f1 which is written to a Queue (left green circle). Now I start two processes pulling from that queue (by executing f2 in the processes). They process the data, say: doubling the value, and write it to the second queue. Now, function f3 reads this data and prints it out. Inside the functions there is a kind of a loop, trying to read from the queue forever. How do I stop this process? Idea 1 f1 does not only send the list, but also a None object or a custon object, class
  • 使用多重处理时在Windows中强制使用if __name __ ==“ __ main__”(Compulsory usage of if __name__==“__main__” in windows while using multiprocessing [duplicate])
    问题 这个问题已经在这里有了答案: Windows上的python多重处理,如果__name__ ==“ __main __” (2个答案) 2年前关闭。 在Windows上的python中使用多处理时,可以保护程序的入口点。 该文档说:“确保新的Python解释器可以安全地导入主模块,而不会引起意外的副作用(例如,启动新进程)”。 谁能解释这到底是什么意思? 回答1 扩展一下您已经获得的好答案,如果您了解Linux-y系统的功能,则将有所帮助。 他们使用fork()产生新的进程,这有两个很好的结果: 主程序中存在的所有数据结构对子进程都是可见的。 它们实际上是在处理数据副本。 子进程在主程序中的fork()之后的指令处开始执行-因此,已在模块中执行的任何模块级代码都不会再次执行。 在Windows中无法使用fork() ,因此在Windows上,每个子进程都会重新导入每个模块。 所以: 在Windows上,子进程看不到主程序中存在的任何数据结构。 和, 所有模块级代码在每个子进程中执行。 所以,你需要想一下它的代码,你只想要在主程序执行。 最明显的例子是您希望创建子进程的代码仅在主程序中运行-因此应由__name__ == '__main__'保护。 作为一个更好的示例,请考虑构建一个巨大列表的代码,您打算将该列表传递给辅助进程以进行爬网。 您可能也希望对此加以保护
  • multiprocessing.Pool在Linux / Python2.7上的terminate()之后生成新的子对象?(multiprocessing.Pool spawning new childern after terminate() on Linux/Python2.7?)
    问题 我有一个可执行文件,需要使用不同的参数来经常运行。 为此,我按照此处给出的模式,使用多处理模块编写了一个小的Python(2.7)包装器。 我的代码如下所示: try: logging.info("starting pool runs") pool.map(run_nlin, params) pool.close() except KeyboardInterrupt: logging.info("^C pressed") pool.terminate() except Exception, e: logging.info("exception caught: ", e) pool.terminate() finally: time.sleep(5) pool.join() logging.info("done") 我的工作者函数在这里: class KeyboardInterruptError(Exception): pass def run_nlin((path_config, path_log, path_nlin, update_method)): try: with open(path_log, "w") as log_: cmdline = [path_nlin, path_config] if update_method: cmdline += [update
  • Python Multiprocessing atexit Error “Error in atexit._run_exitfuncs”
    I am trying to run a simple multiple processes application in Python. The main thread spawns 1 to N processes and waits until they all done processing. The processes each run an infinite loop, so they can potentially run forever without some user interruption, so I put in some code to handle a KeyboardInterrupt: #!/usr/bin/env python import sys import time from multiprocessing import Process def main(): # Set up inputs.. # Spawn processes Proc( 1).start() Proc( 2).start() class Proc ( Process ): def __init__ ( self, procNum): self.id = procNum Process.__init__(self) def run ( self ): doneWork
  • How to parallelize a training loop ever samples of a batch when CPU is only available in pytorch?
    I want to parallelize over single examples or batch of example (in my situation is that I only have cpus, I have up to 112). I tried it but I get a bug that the losses cannot have the gradient out of separate processes (which entirely ruins my attempt). I still want to do it and it essential that after the multiproessing happens that I can do an optimizer step. How do I get around it? I made a totally self contained example: import torch import torch.nn as nn from torch.optim.lr_scheduler import StepLR from torch.utils.data import Dataset, DataLoader from torch.multiprocessing import Pool