天道酬勤,学无止境

如何恢复传递给 multiprocessing.Process 的函数的返回值?(How can I recover the return value of a function passed to multiprocessing.Process?)

问题

在下面的示例代码中,我想恢复函数worker的返回值。 我该怎么做呢? 这个值存储在哪里?

示例代码:

import multiprocessing

def worker(procnum):
    '''worker function'''
    print str(procnum) + ' represent!'
    return procnum


if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,))
        jobs.append(p)
        p.start()

    for proc in jobs:
        proc.join()
    print jobs

输出:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[<Process(Process-1, stopped)>, <Process(Process-2, stopped)>, <Process(Process-3, stopped)>, <Process(Process-4, stopped)>, <Process(Process-5, stopped)>]

我似乎无法在存储在jobs的对象中找到相关属性。

回答1

使用共享变量进行通信。 例如像这样:

import multiprocessing


def worker(procnum, return_dict):
    """worker function"""
    print(str(procnum) + " represent!")
    return_dict[procnum] = procnum


if __name__ == "__main__":
    manager = multiprocessing.Manager()
    return_dict = manager.dict()
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i, return_dict))
        jobs.append(p)
        p.start()

    for proc in jobs:
        proc.join()
    print(return_dict.values())
回答2

我认为@sega_sai 建议的方法更好。 但它确实需要一个代码示例,所以这里是:

import multiprocessing
from os import getpid

def worker(procnum):
    print('I am number %d in process %d' % (procnum, getpid()))
    return getpid()

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes = 3)
    print(pool.map(worker, range(5)))

这将打印返回值:

I am number 0 in process 19139
I am number 1 in process 19138
I am number 2 in process 19140
I am number 3 in process 19139
I am number 4 in process 19140
[19139, 19138, 19140, 19139, 19140]

如果您熟悉map (内置的 Python 2),这应该不会太具有挑战性。 否则请查看 sega_Sai 的链接。

请注意只需要很少的代码。 (还要注意如何重用进程)。

回答3

对于正在寻求如何使用QueueProcess获取值的任何其他人:

import multiprocessing

ret = {'foo': False}

def worker(queue):
    ret = queue.get()
    ret['foo'] = True
    queue.put(ret)

if __name__ == '__main__':
    queue = multiprocessing.Queue()
    queue.put(ret)
    p = multiprocessing.Process(target=worker, args=(queue,))
    p.start()
    p.join()
    print(queue.get())  # Prints {"foo": True}

请注意,在 Windows 或 Jupyter Notebook 中,对于multithreading您必须将其保存为文件并执行该文件。 如果您在命令提示符下执行此操作,您将看到如下错误:

 AttributeError: Can't get attribute 'worker' on <module '__main__' (built-in)>
回答4

出于某种原因,我找不到如何在任何地方使用Queue执行此操作的一般示例(即使 Python 的文档示例也不会产生多个进程),所以这是我在尝试 10 次后得到的结果:

def add_helper(queue, arg1, arg2): # the func called in child processes
    ret = arg1 + arg2
    queue.put(ret)

def multi_add(): # spawns child processes
    q = Queue()
    processes = []
    rets = []
    for _ in range(0, 100):
        p = Process(target=add_helper, args=(q, 1, 2))
        processes.append(p)
        p.start()
    for p in processes:
        ret = q.get() # will block
        rets.append(ret)
    for p in processes:
        p.join()
    return rets

Queue是一个阻塞的、线程安全的队列,你可以用它来存储子进程的返回值。 所以你必须将队列传递给每个进程。 这里不太明显的是,您必须在join Process es 之前从队列中get() ,否则队列会填满并阻塞所有内容。

面向对象的更新(在 Python 3.4 中测试):

from multiprocessing import Process, Queue

class Multiprocessor():

    def __init__(self):
        self.processes = []
        self.queue = Queue()

    @staticmethod
    def _wrapper(func, queue, args, kwargs):
        ret = func(*args, **kwargs)
        queue.put(ret)

    def run(self, func, *args, **kwargs):
        args2 = [func, self.queue, args, kwargs]
        p = Process(target=self._wrapper, args=args2)
        self.processes.append(p)
        p.start()

    def wait(self):
        rets = []
        for p in self.processes:
            ret = self.queue.get()
            rets.append(ret)
        for p in self.processes:
            p.join()
        return rets

# tester
if __name__ == "__main__":
    mp = Multiprocessor()
    num_proc = 64
    for _ in range(num_proc): # queue up multiple tasks running `sum`
        mp.run(sum, [1, 2, 3, 4, 5])
    ret = mp.wait() # get all results
    print(ret)
    assert len(ret) == num_proc and all(r == 15 for r in ret)
回答5

此示例显示如何使用 multiprocessing.Pipe 实例列表从任意数量的进程返回字符串:

import multiprocessing

def worker(procnum, send_end):
    '''worker function'''
    result = str(procnum) + ' represent!'
    print result
    send_end.send(result)

def main():
    jobs = []
    pipe_list = []
    for i in range(5):
        recv_end, send_end = multiprocessing.Pipe(False)
        p = multiprocessing.Process(target=worker, args=(i, send_end))
        jobs.append(p)
        pipe_list.append(recv_end)
        p.start()

    for proc in jobs:
        proc.join()
    result_list = [x.recv() for x in pipe_list]
    print result_list

if __name__ == '__main__':
    main()

输出:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
['0 represent!', '1 represent!', '2 represent!', '3 represent!', '4 represent!']

此解决方案使用的资源少于使用 multiprocessing.Queue 的资源

  • 管道
  • 至少一个锁
  • 缓冲区
  • 一个线程

或使用 multiprocessing.SimpleQueue

  • 管道
  • 至少一个锁

查看每种类型的来源非常有指导意义。

回答6

您可以使用exit内置来设置进程的退出代码。 可以从进程的exitcode属性中获取:

import multiprocessing

def worker(procnum):
    print str(procnum) + ' represent!'
    exit(procnum)

if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,))
        jobs.append(p)
        p.start()

    result = []
    for proc in jobs:
        proc.join()
        result.append(proc.exitcode)
    print result

输出:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[0, 1, 2, 3, 4]
回答7

似乎您应该使用 multiprocessing.Pool 类并使用方法 .apply() .apply_async(), map()

http://docs.python.org/library/multiprocessing.html?highlight=pool#multiprocessing.pool.AsyncResult

回答8

pebble 包有一个很好的抽象,利用multiprocessing.Pipe这使得这非常简单:

from pebble import concurrent

@concurrent.process
def function(arg, kwarg=0):
    return arg + kwarg

future = function(1, kwarg=1)

print(future.result())

示例来自:https://pythonhosted.org/Pebble/#concurrent-decorators

回答9

我想我会简化从上面复制的最简单的例子,在 Py3.6 上为我工作。 最简单的是 multiprocessing.Pool:

import multiprocessing
import time

def worker(x):
    time.sleep(1)
    return x

pool = multiprocessing.Pool()
print(pool.map(worker, range(10)))

您可以使用例如Pool(processes=5)设置池中的进程数。 但是它默认为 CPU 计数,因此对于 CPU 密集型任务将其留空。 (无论如何,I/O 绑定任务通常适合线程,因为线程大部分时间都在等待,因此可以共享一个 CPU 内核。) Pool还应用了分块优化。

(请注意,worker 方法不能嵌套在方法中。我最初在调用pool.map的方法中定义了我的 worker 方法,以保持它全部自包含,但随后进程无法导入它,并且抛出“AttributeError: Can't pickle local object outer_method..inner_method”。更多在这里。它可以在一个类中。)

(欣赏指定打印'represent!'而不是time.sleep()的原始问题,但没有它,我认为有些代码在并发运行时不是。)


Py3 的 ProcessPoolExecutor 也是两行( .map返回一个生成器,所以你需要list() ):

from concurrent.futures import ProcessPoolExecutor
with ProcessPoolExecutor() as executor:
    print(list(executor.map(worker, range(10))))

使用普通流程:

import multiprocessing
import time

def worker(x, queue):
    time.sleep(1)
    queue.put(x)

queue = multiprocessing.SimpleQueue()
tasks = range(10)

for task in tasks:
    multiprocessing.Process(target=worker, args=(task, queue,)).start()

for _ in tasks:
    print(queue.get())

如果您只需要putget请使用 SimpleQueue 。 第一个循环启动所有进程,然后第二个循环进行阻塞queue.get调用。 我不认为有任何理由也调用p.join()

回答10

一个简单的解决方案:

import multiprocessing

output=[]
data = range(0,10)

def f(x):
    return x**2

def handler():
    p = multiprocessing.Pool(64)
    r=p.map(f, data)
    return r

if __name__ == '__main__':
    output.append(handler())

print(output[0])

输出:

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
回答11

如果你使用 Python 3,你可以使用 concurrent.futures.ProcessPoolExecutor 作为一个方便的抽象:

from concurrent.futures import ProcessPoolExecutor

def worker(procnum):
    '''worker function'''
    print(str(procnum) + ' represent!')
    return procnum


if __name__ == '__main__':
    with ProcessPoolExecutor() as executor:
        print(list(executor.map(worker, range(5))))

输出:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[0, 1, 2, 3, 4]
回答12

我修改了 vartec 的答案,因为我需要从函数中获取错误代码。 (感谢 vertec !!!这是一个很棒的技巧)

这也可以使用manager.list来完成,但我认为最好将它放在字典中并在其中存储一个列表。 这样,我们就可以保留函数和结果,因为我们无法确定填充列表的顺序。

from multiprocessing import Process
import time
import datetime
import multiprocessing


def func1(fn, m_list):
    print 'func1: starting'
    time.sleep(1)
    m_list[fn] = "this is the first function"
    print 'func1: finishing'
    # return "func1"  # no need for return since Multiprocess doesnt return it =(

def func2(fn, m_list):
    print 'func2: starting'
    time.sleep(3)
    m_list[fn] = "this is function 2"
    print 'func2: finishing'
    # return "func2"

def func3(fn, m_list):
    print 'func3: starting'
    time.sleep(9)
    # if fail wont join the rest because it never populate the dict
    # or do a try/except to get something in return.
    raise ValueError("failed here")
    # if we want to get the error in the manager dict we can catch the error
    try:
        raise ValueError("failed here")
        m_list[fn] = "this is third"
    except:
        m_list[fn] = "this is third and it fail horrible"
        # print 'func3: finishing'
        # return "func3"


def runInParallel(*fns):  # * is to accept any input in list
    start_time = datetime.datetime.now()
    proc = []
    manager = multiprocessing.Manager()
    m_list = manager.dict()
    for fn in fns:
        # print fn
        # print dir(fn)
        p = Process(target=fn, name=fn.func_name, args=(fn, m_list))
        p.start()
        proc.append(p)
    for p in proc:
        p.join()  # 5 is the time out

    print datetime.datetime.now() - start_time
    return m_list, proc

if __name__ == '__main__':
    manager, proc = runInParallel(func1, func2, func3)
    # print dir(proc[0])
    # print proc[0]._name
    # print proc[0].name
    # print proc[0].exitcode

    # here you can check what did fail
    for i in proc:
        print i.name, i.exitcode  # name was set up in the Process line 53

    # here will only show the function that worked and where able to populate the 
    # manager dict
    for i, j in manager.items():
        print dir(i)  # things you can do to the function
        print i, j

受限制的 HTML

  • 允许的HTML标签:<a href hreflang> <em> <strong> <cite> <blockquote cite> <code> <ul type> <ol start type> <li> <dl> <dt> <dd> <h2 id> <h3 id> <h4 id> <h5 id> <h6 id>
  • 自动断行和分段。
  • 网页和电子邮件地址自动转换为链接。

相关推荐
  • 如何恢复传递给multiprocessing.Process的函数的返回值?(How can I recover the return value of a function passed to multiprocessing.Process?)
    问题 在下面的示例代码中,我想恢复函数worker的返回值。 我该怎么做呢? 此值存储在哪里? 示例代码: import multiprocessing def worker(procnum): '''worker function''' print str(procnum) + ' represent!' return procnum if __name__ == '__main__': jobs = [] for i in range(5): p = multiprocessing.Process(target=worker, args=(i,)) jobs.append(p) p.start() for proc in jobs: proc.join() print jobs 输出: 0 represent! 1 represent! 2 represent! 3 represent! 4 represent! [<Process(Process-1, stopped)>, <Process(Process-2, stopped)>, <Process(Process-3, stopped)>, <Process(Process-4, stopped)>, <Process(Process-5, stopped)>] 我似乎无法找到存储在对象的相关属性jobs 。 回答1
  • 多处理全局变量更新未返回给父级(multiprocessing global variable updates not returned to parent)
    问题 我正在尝试从子流程中返回值,但是不幸的是这些值是无法拾取的。 因此,我在线程模块中成功使用了全局变量,但在使用多处理模块时却无法检索在子流程中完成的更新。 我希望我想念一些东西。 最后给出的结果始终与给定vars dataDV03和dataDV04初始值相同。 子进程正在更新这些全局变量,但是这些全局变量在父级中保持不变。 import multiprocessing # NOT ABLE to get python to return values in passed variables. ants = ['DV03', 'DV04'] dataDV03 = ['', ''] dataDV04 = {'driver': '', 'status': ''} def getDV03CclDrivers(lib): # call global variable global dataDV03 dataDV03[1] = 1 dataDV03[0] = 0 # eval( 'CCL.' + lib + '.' + lib + '( "DV03" )' ) these are unpicklable instantiations def getDV04CclDrivers(lib, dataDV04): # pass global variable dataDV04['driver'
  • Python多进程及多线程基础
    关于进程和线程的基础知识,之前已经分享过一些文章,下面把一些基础知识,再总结下(重点:面试常问):启动一个程序,就默认创建一个主进程,在一个主进程中默认创建一个主线程进程是系统资源分配和调度的基本单位,线程存在于进程中,线程是CPU调度和分配的基本单位。进程之间相互独立,同一个变量,多进程中各自会拷贝一份,而同一个进程的多个线程是共享内存的,所有变量都由所有线程共享,从而提升程序的运行效率。进程之间相互独立,一个进程的崩溃不会影响其他进程,而线程包含在进程之中,如果线程崩溃,则会导致其他线程崩溃,当然也会导致该进程崩溃。所以多进程开发模式要比多线程模式健壮性要强。进程的运行状态1)新建状态:该进程正在被创建,尚未转到就绪状态。2)就绪状态:所有运行条件都已满足,正在等待CPU。3)运行状态(执行窗台):进程正在处理器上运行。4)阻塞状态:进程正在等待某一事件而暂停运行。如等待可用资源或等待输入输出完成。即使处理器空闲,该进程也不能运行。5)死亡状态:进程正在从系统中消失。进程的三个基本状态是可以相互转换的就绪——>运行:当进程获得处理器时,由就绪状态转为运行状态。运行——>就绪:当进程被剥夺处理器时,如用完系统分配给他的时间片,出现更高级别的其它进程,进程由运行状态转为就绪状态。运行——>阻塞:当运行进程因某事件受阻,如所申请资源被占用,启动I/O传输未完成
  • 为python多处理池中的worker获取唯一的ID(Get a unique ID for worker in python multiprocessing pool)
    问题 有没有一种方法可以为python多处理池中的每个工作程序分配一个唯一的ID,从而使池中特定工作程序正在运行的作业可以知道哪个工作程序正在运行它? 根据文档,一个Process有一个name但是 名称是仅用于标识目的的字符串。 它没有语义。 多个进程可以使用相同的名称。 对于我的特定用例,我想在一组四个GPU上运行一堆作业,并且需要为运行该作业的GPU设置设备编号。 由于作业的长度不一致,因此我想确保在上一个作业完成之前,尝试在该作业上运行的作业在GPU上没有冲突(因此这避免了将ID预先分配给作业工作单元)。 回答1 似乎您想要的很简单: multiprocessing.current_process() 。 例如: import multiprocessing def f(x): print multiprocessing.current_process() return x * x p = multiprocessing.Pool() print p.map(f, range(6)) 输出: $ python foo.py <Process(PoolWorker-1, started daemon)> <Process(PoolWorker-2, started daemon)> <Process(PoolWorker-3, started daemon)>
  • 如何在 Python 中对类实例使用多处理?(How to use multiprocessing with class instances in Python?)
    问题 我正在尝试创建一个类,它可以运行一个单独的进程来完成一些需要很长时间的工作,从主模块启动一堆这些,然后等待它们全部完成。 我想启动一次流程,然后继续为他们提供要做的事情,而不是创建和破坏流程。 例如,也许我有 10 个服务器运行 dd 命令,然后我希望它们都 scp 文件等。 我的最终目标是为每个系统创建一个类来跟踪系统的信息,例如 IP 地址、日志、运行时等。但该类必须能够启动系统命令然后返回在该系统命令运行时执行返回给调用者,以便稍后跟进系统命令的结果。 我的尝试失败了,因为我无法通过 pickle 通过管道将类的实例方法发送到子进程。 那些不是腌制的。 因此,我尝试以各种方式修复它,但我无法弄清楚。 如何修补我的代码以执行此操作? 如果你不能发送任何有用的东西,多处理有什么好处? 是否有与类实例一起使用的多处理的良好文档? 我可以让多处理模块工作的唯一方法是使用简单的函数。 每次在类实例中使用它的尝试都失败了。 也许我应该通过事件来代替? 我还不明白该怎么做。 import multiprocessing import sys import re class ProcessWorker(multiprocessing.Process): """ This class runs as a separate process to execute worker's
  • Python - 如何将全局变量传递给 multiprocessing.Process?(Python - How to pass global variable to multiprocessing.Process?)
    问题 我需要在一段时间后终止一些进程,所以我使用睡眠另一个进程来等待。 但是我猜新进程无法从主进程访问全局变量。 请问我该如何解决? 代码: import os from subprocess import Popen, PIPE import time import multiprocessing log_file = open('stdout.log', 'a') log_file.flush() err_file = open('stderr.log', 'a') err_file.flush() processes = [] def processing(): print "processing" global processes global log_file global err_file for i in range(0, 5): p = Popen(['java', '-jar', 'C:\\Users\\two\\Documents\\test.jar'], stdout=log_file, stderr=err_file) # something long running processes.append(p) print len(processes) # returns 5 def waiting_service(): name =
  • Python 多处理进程静默崩溃(Python multiprocessing Process crashes silently)
    问题 我正在使用 Python 2.7.3。 我使用子类multiprocessing.Process对象并行化了一些代码。 如果我的子类 Process 对象中的代码没有错误,则一切正常。 但是,如果我的子类 Process 对象中的代码中有错误,它们显然会无声地崩溃(没有堆栈跟踪打印到父 shell)并且 CPU 使用率将降至零。 父代码永远不会崩溃,给人的印象是执行只是挂起。 同时,很难追踪代码中的错误在哪里,因为没有给出错误在哪里的指示。 我在 stackoverflow 上找不到处理相同问题的任何其他问题。 我猜子类的 Process 对象似乎无声地崩溃,因为它们无法向父级的 shell 打印错误消息,但我想知道我能做些什么,以便我至少可以更有效地调试(以及其他我的代码的用户也可以在遇到问题时告诉我)。 编辑:我的实际代码太复杂了,但是一个带有错误的子类 Process 对象的简单示例将是这样的: from multiprocessing import Process, Queue class Worker(Process): def __init__(self, inputQueue, outputQueue): super(Worker, self).__init__() self.inputQueue = inputQueue self.outputQueue =
  • 如何检索通过 multiprocessing.Process 调用的函数返回的多个值(How to retrieve multiple values returned of a function called through multiprocessing.Process)
    问题 我有一个这样的场景: for each in content : pdf_output,job_id=createpdf(each) if pdf_output : pdf_output = pdf_output + pdf_output 我正在尝试并行化整个过程。像这样 jobs=[] for each in content : jobs.append(multiprocessing.Process(target=self.createpdf, args=(content))) for each in jobs : jobs.start() for each in jobs : jobs.join() 我如何明智地完成任务 if pdf_output : pdf_output = pdf_output + pdf_output 每个工作? 如何检索 createpdf 发送的 2 个返回值并对其进行处理? 我认为 multiprocessing.Queue 是一个线索,但我该如何实现呢? 回答1 如此简单的任务不需要队列。 我建议使用池。 Pool.map方法可以将一个函数并行应用于一系列值: import multiprocessing def createpdf(data): return ("This is my pdf data: %s\n" % data, 0)
  • 如何在Python中使用多处理队列?(How to use multiprocessing queue in Python?)
    问题 我很难理解多处理队列如何在python上工作以及如何实现它。 可以说我有两个python模块,它们从共享文件访问数据,让我们将这两个模块称为writer和Reader。 我的计划是让读取器和写入器都将请求放入两个单独的多处理队列中,然后让第三个进程将这些请求循环弹出并照此执行。 我的主要问题是我真的不知道如何正确实现multiprocessing.queue,您无法为每个进程真正实例化对象,因为它们将是单独的队列,如何确保所有进程都与一个共享队列相关(或在这种情况下,排队) 回答1 我的主要问题是我真的不知道如何正确实现multiprocessing.queue,您无法为每个进程真正实例化对象,因为它们将是单独的队列,如何确保所有进程都与一个共享队列相关(或在这种情况下,排队) 这是一个读取器和写入器共享一个队列的简单示例。 当写入器的数字用完时,它将发送“ DONE”(完成),让读者知道要退出读取循环。 from multiprocessing import Process, Queue import time import sys def reader_proc(queue): ## Read from the queue; this will be spawned as a separate Process while True: msg = queue.get() #
  • 当我调用 multiprocessing.Process 时正在腌制什么?(What is being pickled when I call multiprocessing.Process?)
    问题 我知道multiprocessing使用酸洗来让进程在不同的 CPU 上运行,但我想我对酸洗的内容有点困惑。 让我们看看这段代码。 from multiprocessing import Process def f(I): print('hello world!',I) if __name__ == '__main__': for I in (range1, 3): Process(target=f,args=(I,)).start() 我假设正在腌制的是def f(I)和参数。首先,这个假设是否正确? 其次,假设f(I)有一个函数调用,例如: def f(I): print('hello world!',I) randomfunction() randomfunction的定义是否也被腌制,还是只是函数调用? 此外,如果该函数调用位于另一个文件中,进程是否能够调用它? 回答1 在此特定示例中,腌制的内容取决于平台。 在支持os.fork系统上,比如 Linux,这里没有任何东西被腌制。 您传递的目标函数和参数都通过fork由子进程继承。 在不支持fork平台上,如 Windows, f函数和args元组都将被腌制并发送到子进程。 子进程将重新导入您的__main__模块,然后解开函数及其参数。 在任何一种情况下, randomfunction函数实际上都没有被腌制。
  • multiprocessing.value 清除语法?(multiprocessing.value clear syntax?)
    问题 我想使用 multiprocessing.Value 在多个进程中使用一个变量,但 Python 文档中的语法不清楚。 谁能告诉我应该使用什么作为类型(我的变量是一个字母),以及在哪里放置我的变量名称? 编辑 我尝试使用管理器在进程之间共享我的信件。 但我现在唯一拥有的是Value('ctypes.c_char_p', ' (The key you hit here) ')打印在 Python Shell 中,但仍然没有声音。 使用管理器时,控制台似乎也比平时慢一些。 在我按下键和Value出现在屏幕上之间有将近一秒的延迟。 我的代码现在看起来像这样: #Import from tkinter import * import wave import winsound import multiprocessing #Functions def key(event): temp = event.char manager = multiprocessing.Manager() manager.Value(ctypes.c_char_p, temp) hitkey = manager.Value(ctypes.c_char_p, temp) instance = multiprocessing.Process(target=player, args=(hitkey,))
  • 计算在多处理中执行的任务总数。执行期间的池(Counting total number of tasks executed in a multiprocessing.Pool during execution)
    问题 我很乐意说明我们目前的谈话。 我正在从事农业锻炼,想知道当前的进展。 因此,如果我向10处理器发送了100个作业,那么如何显示当前返回的作业数量是多少。 我可以获取ID,但是如何从我的地图函数中计算完成的返回作业的数量。 我正在按以下方式调用我的函数: op_list = pool.map(PPMDR_star, list(varg)) 在我的函数中,我可以打印当前名称 current = multiprocessing.current_process() print 'Running: ', current.name, current._identity 回答1 如果使用pool.map_async ,则可以从返回的MapResult实例中提取此信息。 例如: import multiprocessing import time def worker(i): time.sleep(i) return i if __name__ == "__main__": pool = multiprocessing.Pool() result = pool.map_async(worker, range(15)) while not result.ready(): print("num left: {}".format(result._number_left)) time.sleep(1
  • 使用多处理时 cv2.Boost 的 Pickle 异常(Pickle exception for cv2.Boost when using multiprocessing)
    问题 我正在研究名为“Faciel Actions Units Detection”的项目我正在使用 python2.7 和 opencv 2.4 错误: pickle.PicklingError: Can't pickle <type 'cv2.Boost'>: it's not the same object as cv2.Boost 从屏幕截图转录的部分回溯: Loading classifier for action unit 27 Traceback (most recent call last): File "C:\Python27\audetect-master\audetect-interactive.py", line 59, in <module> main() File "C:\Python27\audetect-master\audetect-interactive.py", line 18, in main active_aus = detector.detect() File "C:\Python27\audetect-master\detect.py", line 67, in detect initial_points = self.ffdetector.locate_features(first) File "C:\Python27
  • Python进程池Pool的使用
    1.进程池Pool 需要创建的子进程数量不多时,可以直接利用multiprocessing中的Process动态成生多个进程,但如果是上百甚至上千个目标,手动的去创建进程的工作量巨大,此时就可以用到multiprocessing模块提供的Pool方法。 初始化Pool时,可以指定一个最大进程数,当有新的请求提交到Pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到指定的最大值,那么该请求就会等待,直到池中有进程结束,才会用之前的进程来执行新的任务。 请看下面的实例: import multiprocessing import time def copy_work(): print("拷贝中....",multiprocessing.current_process().pid) time.sleep(0.3) if __name__ == '__main__': # 创建进程池, Pool(3) 表示创建容量为3个进程的进程池 pool = multiprocessing.Pool(3) for i in range(10): # 利用进程池同步拷贝文件,进程池中的进程会必须等上一个进程退出才能执行下一个进程 # pool.apply(copy_work) pool.apply_async(copy_work) pool.close() #
  • Python程序中的进程操作-进程间通信(multiprocess.Queue)
    目录一、进程间通信二、队列2.1.1 方法介绍2.1.2 其他方法(了解)2.1 概念介绍——multiprocess.Queue三、代码实例——multiprocess.Queue3.1 单看队列用法3.2 子进程发送数据给父进程3.3 批量生产数据放入队列再批量获取结果四、生产者消费者模型4.1 为什么要使用生产者和消费者模式4.2 什么是生产者消费者模式4.3 基于队列实现生产者消费者模型4.4 改良版——生产者消费者模型4.5 主进程在生产者生产完毕后发送结束信号None4.6 多个消费者的例子:有几个消费者就需要发送几次结束信号五、JoinableQueue([maxsize])5.1 方法介绍5.2 JoinableQueue队列实现消费之生产者模型一、进程间通信IPC(Inter-Process Communication)二、队列2.1 概念介绍——multiprocess.Queue创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。Queue([maxsize])创建共享的进程队列。参数 :maxsize是队列中允许的最大项数。如果省略此参数,则无大小限制。底层队列使用管道和锁定实现。2.1.1 方法介绍Queue([maxsize]) :创建共享的进程队列。maxsize是队列中允许的最大项数。如果省略此参数
  • Python学习笔记 进程
    目录  - 创建进程     ~ multiprocessing.Process 类     ~ 自定义的进程类  - 进程间同步(锁)     ~ 互斥锁(Lock)     ~ 可重入锁(RLock)  - 进程间通信     ~ 管道(Pipe)     ~ 队列(Queue)     ~ 共享内存(Value 和 Array)     ~ 服务进程(Manager)  - 生产者消费模型  - 进程池  - 系统启动进程的方式(了解)  - 名词解释     ~ 并发和并行     ~ 同步和异步     ~ 阻塞和非阻塞     ~ 进程     ~ 守护进程     ~ 僵尸进程     ~ 孤儿进程      创建进程   所谓进程,就是当前正在执行的程序。比如你打开了一个qq,这时就会产生一个进程。   在 Python 中,我们通过两种方式创建一个进程,一是通过 multiprocessing 模块提供的 Process 类,二是我们通过对 Process 类的继承自定义的一个进程类。下面将介绍这两种创建进程的方式,并示例 multiprocessing.Process 类   Process 是在 multiprocessing 模块中实现的一个类,我们通过对该类进行实例化来创建一个进程对象,并通过调用该类实例的 start() 方法来启动一个进程
  • 深入理解计算机系统(3.7)------过程(函数的调用原理)
    深入理解计算机系统(3.7)------过程  上篇博客我们讲解了计算机汇编语言是如何实现循环结构的。本篇博客我们将介绍汇编语言中过程的实现方式。  过程在高级语言中也称为函数,方法。一个过程的调用包括将数据(以过程参数和返回值的形式)和控制从代码的一部分传递到另一部分。此外,它还必须在进入时为过程的局部变量分配空间,并在退出时释放空间。大多数机器,包括我们一直讲的 IA32,只提供转移控制到过程和从过程中转移出控制这种简单指令。数据传递和局部变量的分配释放都是通过操纵程序栈来实现。  合理的构建方法并调用,能大大增加代码的复用性,也能使代码结构更加清晰,接下来我们就来详细的介绍。 1、栈帧结构  IA32 程序用程序栈来支持过程调用。机器用栈来传递过程参数、存储返回信息、保存寄存器用于以后恢复,以及本地存储。而为单个过程分配的那部分栈称为帧栈(stack frame)。  帧栈可以认为是程序栈的一段,它有两个端点,一个标识着起始地址,一个标识着结束地址,而这两个地址,则分别存储在固定的寄存器当中,即起始地址存在%ebp寄存器当中,结束地址存在%esp寄存器当中。也就是说寄存器 %ebp 为帧指针,寄存器 %esp 为栈指针。  当程序执行时,栈指针可以移动,因此大多数信息的访问都是相对于帧指针的。     这个图基本上已经包括了程序栈的构成,它由一系列栈帧构成
  • multiprocessing.Queue 作为 arg 以池工作人员中止工作人员的执行(multiprocessing.Queue as arg to pool worker aborts execution of worker)
    问题 我实际上发现很难相信我遇到了我遇到的问题,这似乎是 python 多处理模块中的一个大错误......无论如何,我遇到的问题是每当我通过一个 multiprocessing.Queue 到 multiprocessing.Pool worker 作为参数池 worker 从不执行其代码。 即使在一个非常简单的测试中,我也能够重现这个错误,该测试是在 python 文档中找到的示例代码的稍微修改版本。 这是队列示例代码的原始版本: from multiprocessing import Process, Queue def f(q): q.put([42, None, 'hello']) if __name__ == '__main__': q = Queue() p = Process(target=f, args=(q,)) p.start() print(q.get()) # prints "[42, None, 'hello']" p.join() 这是我对队列示例代码的修改版本: from multiprocessing import Queue, Pool def f(q): q.put([42, None, 'hello']) if __name__ == '__main__': q = Queue() p = Pool(1) p.apply_async(f
  • 使用 importlib 选择一个模块并在多处理函数中使用(Select a module using importlib and use in multiprocessing functions)
    问题 我想在我的主函数中根据传递给 Python 脚本的参数选择要导入的模块。 所以,我正在使用其中之一 blah = importlib.import_module("blah1") blah = importlib.import_module("blah2") 其中“blahX”是同一接口的不同实现。 我还想使用multiprocessing模块将工作传递给不同的进程。 blah = None def f(a, b): print blah.f(a,b) if __name__ == '__main__': # call importlib.import_module here... a = 1 b = 2 p = multiprocessing.Process(target=f, args=(a, b)) p.start() p.join() 问题是传递给multiprocessing.Process的函数不知道我在 main 中导入的模块。 这与我使用import import blah1 as blah #import blah2 as blah 但后来我失去了在运行时选择模块的能力。 我该如何修复这个设计? 回答1 当您调用mp.Process(...) ,多处理模块会派生一个子进程(在 Unix 上)或启动一个新的 Python 进程并导入调用模块(在 Windows
  • Python多重处理:按值传递对象?(Python multiprocessing: object passed by value?)
    问题 我一直在尝试以下方法: from multiprocessing import Pool def f(some_list): some_list.append(4) print 'Child process: new list = ' + str(some_list) return True if __name__ == '__main__': my_list = [1, 2, 3] pool = Pool(processes=4) result = pool.apply_async(f, [my_list]) result.get() print 'Parent process: new list = ' + str(my_list) 我得到的是: Child process: new list = [1, 2, 3, 4] Parent process: new list = [1, 2, 3] 因此,这意味着my_list是按值传递的,因为它没有突变。 那么,是否将规则传递给另一个进程时确实是按值传递的规则呢? 谢谢。 回答1 正如AndréLaszlo所说, multiprocessing库需要对传递给multiprocessing.Pool方法的所有对象进行腌制,以便将它们传递给工作进程。 酸洗过程导致在工作进程中创建一个不同的对象