【やってみた】multiprocessingを用いてipykernelにcpu並列計算を実装してみた

やりたいこと

 パラメータ(a,b)を持つ二変数関数V(x,y)の最小値を、パラメータ値を変えたときごとに計算したかった*1のですが、めちゃくちゃ時間がかかりました。しかし、やることは異なるパラメータで同じ計算を繰り返すだけなのですから、計算ごとの結果が同期しているわけでも無いし、1つのcpuコアに全部回させるよりも複数のコアに頑張ってもらう方がはるかに効率的です。
 ので、for文でパラメータを回す部分を、並列処理してもらうことにしました。

結論

 こういう感じにするとうまくいきます。
 (関数定義)の target_func を好きな関数に置き換えて、(実行)の target_func をその関数の名前に、 kwargs を適当な引数に置き換えて使えます。
 環境:Jupyter Notebook の ipykernel を使用
    Windows 11, version 23H2 と MacBook Pro Sonoma 14.2.1

 サンプルコード:
(コードの大部分は
zenn.devのものをいただきました。ありがとうございます。)

(関数定義)

# Windows ipykernel know only multiprocess
import multiprocess as mp
from multiprocess import Manager
# For Mac, use below instead
#import multiprocessing as mp
#from multiprocessing import Manager

def target_func(returned_dict, paraArray, paraNum, tol):
        # import all packages for the function
    import numpy as np
    import scipy as scp
    import scipy.optimize as optim
        # below is sample target function
    print(paraNum, "start")
    def test_func(x, a):
        return x**2 + 2*a*x + (1+a**2)/2
    Va = lambda x: test_func(x, a = paraArray[paraNum])
    x_ini = 0
    res = optim.minimize(Va, x_ini, method = 'Nelder-Mead', tol = tol)
    minVa = res.fun
    if minVa < 0:
        returned_dict[paraNum] = 0
    elif minVa == 0:
        returned_dict[paraNum] = 1
    elif minVa > 0:
        returned_dict[paraNum] = 2
    print(paraNum, "end")

# multiprocessing

# make num_pool numbers processes
        # (At most) num_pool processes work simultaneously
def parallel_Pool(target, kwargs, num_pool):
        # limit the number of the processes working simultaneously by num_pool
    p = mp.Pool(num_pool)
        # for mac, since original get_start_method is 'spawn', change it into the mac default: 'fork'
    #from multiprocessing import get_context
    #p = get_context("fork").Pool(num_pool)
    for kwarg in kwargs:
        p.apply_async(func=target, kwds=kwarg)
    p.close()
    p.join()
    print('All processes done')

# Process: do num_proc calc. and end them all simultaneouly, then redo num_proc calc.
def parallel_Process(target, kwargs, num_proc):
        # limit the number of the processes working simultaneously by num_proc
    jobs = []
    proc = 0
    rest = len(kwargs)
    for kwarg in kwargs:
            # limit number = num_proc
            # under the limit, do Process
        if proc < num_proc:
            p = mp.Process(target=target, kwargs=kwarg)
            # for mac, since original get_start_method is 'spawn', change it into the mac default: 'fork'
            #from multiprocessing import get_context
            #p = get_context("fork").Process(target=target, kwargs=kwarg)
            jobs.append(p)
            p.start()
            proc += 1
            rest -= 1
                # On the limit, close the processes and reset the number of working processes
            if proc == num_proc or rest == 0:
                print('%s process was working, and rest process is %s.'%(num_proc, rest))
                for job in jobs:
                    job.join()
                proc = 0
                jobs = []
    print('All processes done')

(実行)

%%time
import numpy as np

paraArray = np.linspace(-5, 5, 2000)
tol = 1e-5

# Multiprocessing_make dict
manager = mp.Manager()
returned_dict = manager.dict()

if __name__ == '__main__':
    num_pool = 4
    kwargs = [
        {'returned_dict':returned_dict, 'paraArray':paraArray, 'paraNum':paraNum, 'tol':tol}
        for paraNum in range(len(paraArray))
        ]
    parallel_Pool(target_func, kwargs, num_pool)
        # with changing into below, calculate by mp.Processing
    #parallel_Process(target_func, kwargs, num_proc=num_pool)

# get result
res = np.zeros(len(paraArray))
for i in range(len(paraArray)):
    res[i] = returned_dict[i]

# make figure
import matplotlib.pyplot as plt
fig, ax = plt.subplots()
ax.plot(paraArray, res)
plt.show

 以下、コードのポイントをぐだぐだ話します。

これはもくじ

これはあみだくじ

これはあみだくじ
(Asanagi - 投稿者自身による著作物, CC0, https://commons.wikimedia.org/w/index.php?curid=117860065による)

解説

シチュエーション(target_func)

 コードの意味を分かりやすくするためにも、どうせなら具体的な系を扱いたいので、簡単な例として、次の問題を解くことを考えます。

問:  a が実数全体を動くとき、双曲線  2x^2 - y^2 = 1 と直線  y = 2x + a との共有点の個数を与えよ。

 これは  y を双曲線の方程式に代入してできる二次方程式  2x^2 + 4ax + a^2 + 1 = 0 がいくつ解を持つか、に帰着し、判別式の符号から判定できます。ここでは正攻法からあえて外れて、「二次方程式が解を2つ持つ  \Leftrightarrow 二次関数  f(x) = 2x^2 + 4ax + a^2 + 1 の最小値が負」に着目し、パラメータ  a を変化させたときの  f(x) の最小値を考えることにします。
 そこで、次のような最適化関数を取りましょう。

def target_func(returned_dict, paraArray, paraNum, tol):
        # import all packages in the function
    import numpy as np
    import scipy as scp
    import scipy.optimize as optim
        # below is sample target function
    print(paraNum, "start")
    def test_func(x, a):
        return x**2 + 2*a*x + (1+a**2)/2
    Va = lambda x: test_func(x, a = paraArray[paraNum])
    x_ini = 0
    res = optim.minimize(Va, x_ini, method = 'Nelder-Mead', tol = tol)
    minVa = res.fun
    if minVa < 0:
        returned_dict[paraNum] = 0
    elif minVa == 0:
        returned_dict[paraNum] = 1
    elif minVa > 0:
        returned_dict[paraNum] = 2
    print(paraNum, "end")

 paraArray はパラメータ a の配列、 paraNum はその何番目を取るかの変数です。returned_dict は結果を保存する辞書型変数で、後で説明します。
 まず、配列を扱い、最適化を行うためのパッケージをインポートします。次に target_func 内の test_func とその後の Va で a が paraArray[paraNum] の値のときの二次関数を与え、その最小値を最適化関数のパッケージ optimize.minimize を使って求めます。最適化には初期値が必要で、とりあえず 0 を入れておきます。tol は最適化をストップするときの許容誤差で、計算時間が短くなるよう適当な値にしておきます。やりたければ、特定の method を指定したり、最小化するときの変数 x の境界を設定することもできます。同時に複数の変数を回すときにはリストの形で

x_ini = [x0, x1]
x_bound = optim.Bounds(lb = [x0lb, x1lb], ub = [x0ub, x1ub])
res = optim.minimize(Vx, x_ini, method = 'Nelder-Mead', tol = tol, bound = x_bound)

などと与えましょう。ここで用いた Nelder-Mead 法はヤコビアンやヘシアンを指定せず使える最適化手法です。他の方法や指定パラメータはこちらを参照してください:
docs.scipy.org

 さて、最適化の結果は res の中に収納されており、最小値は res.fun 、最小値を与える引数の値は res.x で呼び出せます*2。f(x) の最小値が負であれば共有点は2つになるので、 res.fun の値を参照して結果を求めます。
 returned_dict は辞書型変数で、はじめは空で与えられ、これに逐一結果を書き込みます。これは並列計算に使えるパッケージ multiprocessing を使って、

manager = mp.Manager()
returned_dict = manager.dict()

から導入することができます。
qiita.com

ここでのプログラム target_func では最後に "returned_dict[paraNum] = n" の形で書き込んでいます。こうしておくと、

res = np.zeros(len(paraArray))
for i in range(len(paraArray)):
    res[i] = returned_dict[i]

のようにして値を取り出すことができます*3。中身全体を見るには

str(returned_dict)

で見られます。

 (実行)に書いたように、パラメータ a の paraArray が -5 から +5 まで 2000点を取るときには、target_func の paraNum を 0 から1999 まで動かしたときに得られる結果をまとめてやればよいでしょう。1プロセスのみでやる場合には、

import numpy as np
paraArray = np.linspace(-5, 5, 2000)
tol = 1e-5

manager = mp.Manager()
returned_dict = manager.dict()

res = np.zeros(len(paraArray))
for i in range(len(paraArray)):
    target_func(returned_dict, paraArray, i, tol)
    res[i] = returned_dict[i]

とすればよく、以下のようなプロットが得られます。

パラメータ a を動かしたときの共有点の数の変化

 ということで  a = \pm 1 のときに共有点の数が 0 から 2 へ飛ぶことが分かりました。めでたしめでたし。

 並列化の実装に先んじて、並列化時のポイントを述べておきます。
 これは私の環境だけかもしれませんが、並列化したい関数を"外"で定義していたところ*4、"name 'np' is not defined" のようなエラーが出ました。それを回避するには、使うパッケージ、使う関数、その他を全て並列化したい関数の中に書く必要があります。だから上記のサンプルコードではインポートや関数定義をすべて target_func の定義の中に入れていたんですね。
 理由はよく分かりませんが、並列化で動かす子プロセスは親が動いているところのパッケージが見えていないんじゃないかみたいな言説があり、なるほどなぁと思いました*5
stackoverflow.com

 プログラム中にある print コマンドは、いま実行している番号と実行が終わった番号が分かるように入れたものです。不必要ならコメントアウトするか消しましょう。
 私のMacくんはなぜか同時に複数の print を行ってきて、
"143 start start start"
みたいなことになりますし、プロット数が多くなると print が混雑してダメになります。ひとつひとつがすぐ終わるようなときには不要な負荷になりそうなので取った方がいいかもしれません。


並列化のパッケージ:multiprocessing

 以上の計算を並列化します。先のプログラムは、パラメータ a の配列 paraArray と配列の中の番号 paraNum を取って、対応する a の値で計算するものだったので、それぞれの paraNum での計算を別のプロセスに投げて並列化しましょう。
 有名な[定義?]並列計算は threading, Process, Pool がありますが*6、ネットで調べたら後者2つが強そうだったので、 multiprocessing に手を出すことにしました。
qiita.com
qiita.com
 並列計算には multiprocessing というパッケージをインポートします。

import multiprocessing as mp
from multiprocessing import Manager

 ちなみに、私の Windows PC w/ Jupyter Notebook の ipykernel では multiprocessing はなぜか読んでくれませんでした。そこで、代わりに multiprocess を入れます。

import multiprocess as mp
from multiprocess import Manager

 多分どっちでも、統一されていればオッケーだと思います。知らんけど。

並列化の実行:Pool

 さて、multiprocessing には Process と Pool の2つがあります。なんとなく、Process は1つずつプロセスをどんどん作っていく感じ、Pool はいくつかのプロセスの束を持っておき、それぞれを動かしていく感じです。まず Pool から見てみます。
 このサンプルコードでは、Pool を用いて一般の関数を並列化する関数を定義しています。

def parallel_Pool(target, kwargs, num_pool):
    p = mp.Pool(num_pool)
    for kwarg in kwargs:
        p.apply_async(func=target, kwds=kwarg)
    p.close()
    p.join()
    print('All processes done')

 見やすくするためにコメントをいくつか削りました。
 まず mp.Pool で num_pool 個の子プロセスを作ります。Pool ではこの数ぶんだけのプロセスが一度に動いており、どれか1つのプロセスが完了すると、そのプロセスが次の仕事を始めます。
 たいへんな問題として、Macでは上のコードを実行すると 'AttributeError: Can't get attribute "xxx"' と出ることがあります。これはOSXの start method が fork なのに、multiprocessing では spawn になっているのがやばいそうです。???
 ので、上の2行目のところを以下のように書き換えます。

from multiprocessing import get_context
p = get_context("fork").Pool(num_pool)

zenn.dev
 戻って、各プロセスの仕事のやり方は、 apply_async のように指定できます。apply_async は非同期的にプロセスを動かすという意味です。ほかに map と apply があり、前者は変数の入れ方が少し違います。後者は同期的に実行するので並列化されません。
qiita.com
docs.python.org
 ここで使った apply_async は(並列化する関数、その関数の引数)を引数として持ちます。とくに、関数の引数のリスト kwargs を用意しておいて、リストの要素 kwarg で apply_async のループを回してやれば、各引数ごとの計算が異なるプロセスで並列的に行われていきます。
 先では1プロセス中の for文でパラメータのループを回して計算していたのが、これでめでたく並列化できたわけです。

 ちなみに、ただ子プロセスを開始しただけでは、その完了を待たずに実行が終了してしまうので、 join() を入れて完了を待ちます。Pool では join() の前に close() しておく必要があります。
qiita.com
 全ての処理が終わった後に完了を通知するようにしましたが、別にいらないと思います。私は使ってないです。


 上記の関数を使って並列計算を実行するには、以下のようにします。

if __name__ == '__main__':
    num_pool = 4
    kwargs = [
        {'returned_dict':returned_dict, 'paraArray':paraArray, 'paraNum':paraNum, 'tol':tol}
        for paraNum in range(len(paraArray))
        ]
    parallel_Pool(target_func, kwargs, num_pool)

 はじめに書いてあるのはおまじないです。
docs.python.org
 num_pool が 4 なので、同時に4個までのプロセスを動かします。動かす数の目安はcpuのコア数で、多すぎても振り分けがスタックしてかえって遅くなります。一応、

from multiprocessing import cpu_count
cpu_count()

から確認できます。

 kwargs には並列化する関数の引数のリストを入れます。1プロセスのときに paraNum を for文で回して計算したように、parallel_Pool が paraNum について並列化してくれるよう、変数のリスト kwargs を作るためのループを paraNum で回します。

 変数を辞書型で渡しているのは、 target_func 内で kwarg から変数を取り出さずとも、 kwarg に書いている変数の名前のまま使えて楽ちんだからです。
 計算結果をプロットするには

res = np.zeros(len(paraArray))
for i in range(len(paraArray)):
    res[i] = returned_dict[i]

などして適当な配列に入れてやれば、matplotlib などで好きにプロットできます*7

 一般の関数を並列化するときにも、並列化したい関数を self-contained に定義しておいて、上のプログラムの target_func の部分をその関数名に書き換え、kwargs を適当に置き換えれば、簡単に並列計算ができます。うれしいですね。

並列化の実行:Process

 最後に Process の方の説明をします。Process では Process(target, kwargs) でプロセスを作ります。Pool と違い1つずつ作るので、それぞれ start() で開始を宣言します。また、終わったプロセスは終わったままになるので、kwargs の要素の数ぶんだけ、プロセスを作って実行します。あんまりたくさんプロセスを作っても一度に作業できるcpuコア数には限りがあるので、ある程度の数で区切って、その数ずつ実行するのが良いでしょう。

def parallel_Process(target, kwargs, num_proc):
    jobs = []
    proc = 0
    rest = len(kwargs)
    for kwarg in kwargs:
        if proc < num_proc:
            p = mp.Process(target=target, kwargs=kwarg)
            jobs.append(p)
            p.start()
            proc += 1
            rest -= 1
            if proc == num_proc or rest == 0:
                print('%s process was working, and rest process is %s.'%(num_proc, rest))
                for job in jobs:
                    job.join()
                proc = 0
                jobs = []
    print('All processes done')

 サンプルコードでは、num_proc を同時に動かすプロセス数と決めて、その分だけ実行したらプロセスを jobs に溜め、プロセスが終了するのを join() で待ちます。そして終了を確認したら再度プロセスを作っていきます。
 ここでもMacでは7行目の mp.Process を次に置き換えます。

from multiprocessing import get_context
p = get_context("fork").Process(target=target, kwargs=kwarg)

 この方法の良いところは、いまどこまで終わったか、が明確に分かるため進捗状況が見やすいところです。一方、一度に作ったすべてのプロセスが完了してから再度プロセスを作るので、どれか1つだけ計算に時間がかかるような場合、他のプロセスがただ待機するだけの無駄な時間が発生してしまいます。
 サンプルコードでそうしているように、各パラメータで計算を実行するたびに開始・終了を print して進捗が確認できるようにしつつ、Pool で回すのが良いような気がします。

 ほとんど同じですが、実行用のコードも載せておきます。サンプルコードでは簡単のために num_pool の値を流用しています。

if __name__ == '__main__':
    num_proc = 4
    kwargs = [
        {'returned_dict':returned_dict, 'paraArray':paraArray, 'paraNum':paraNum, 'tol':tol}
        for paraNum in range(len(paraArray))
        ]
    parallel_Process(target_func, kwargs, num_proc)

結びに

 せっかくなので「並列化して早くなりますよ!」という計算時間の比較でも載せたかったのですが、計算がシンプルすぎるのか? あまり早くはなりませんでした。こういうときはひとつひとつの計算のcpu負荷を重くすれば良いので、tol を 1e-8 に小さくしてみました。

tol = 1e-8 のときの paraArray の大きさ(横軸)と計算時間/秒(縦軸)

 青い線が並列計算、オレンジの線が forループ計算の結果です。
 paraArray の大きさが num_pool = 4 のオーダーのときには分けたことでより時間がかかっていますが、十分大きい数では並列化で高速になっていることが分かります。

 ついでに、num_pool を変えたときの計算時間も見てみました。条件は tol = 1e-8、paraArray の大きさは2000です。

num_pool(横軸)を変えたときの計算時間(縦軸)の変化

 計算をやった Mac の cpu_count() は 8 だったので、もう少し下がるかと思いましたが、4 ぐらいでもうサチっている感じですね。8 と言いつつも普段使いの状態で4つくらいは常にコアが動いているので、実質 4 以上はそんなに下がらないのかな?

 ということで Windowsくんにもやってもらいました。やっぱり 4 くらいでサチってそう。Windowsくんはつよつよで 32 くらいいけるはずなので、効率化がサチるのはむしろ入出力あたりの限界をたたいている感じがありそうです。

なんか図でか


 自分用の計算では1ヶ月かかるところが1日になったので、本気の並列計算はすごいんですよ、本当ですよ。みなさんはもっと重い計算を投げて、並列計算の良さを実感しましょう。そして、cpuをたくさんいじめましょう!
 では、今日はここまで。次回、gpuを利用した並列計算編でお会いしましょう。さようなら〜 ノシ


またね。
えへへ。



 

*1:物理の言葉で言うところの、外場のもとのポテンシャルを解いて相図を作るってやつです。

*2:res.x の x は別に変数名を x で与えたからとかじゃないです。いつでも x 。

*3:returned_dict は辞書型変数で、 str(returned_dict) で中を見ると、 {"0":n0, "1":n1, ......} の形になっていることが分かります。複数変数を扱っているときには "returned_dict[num1, num2] = (n, m, l)" のようにタプルを入れてやることもできて、これは {(0,0):(n,m,l), ......} となっており、例えば returned_dict[0, 0][0] が n を与えます。 returned_dict 自体は特殊な型になっていますが、(実行)のプログラムにあるように中身を移し替えれば、結果がプロットできます。

*4:外というのは、この後扱う並列化用の関数 parallel_xxx の引数 target に入れる関数(ここでは target_func)の外、つまり def target_func の中に入れていないということです。

*5:これ単純にエラーメッセージで検索したら、「パッケージはインポートして使おう!」みたいな初心者向けページが出てきまくって本当に大爆笑。ワハハ。何が腹立つって、この手の質問、初心者と思って解決法を分かっていない人が偉そうに「そんなこともできないの?」みたいな姿勢で返してるのが見つかるんですよねェ〜〜〜ッ。フザけてんのかてめェッ! 文章もちゃんと読めねェやつが回答者気取ってんじゃあないッ!

*6:後に引用したブログにあるように、threading は代わる代わる並行して処理しているもので「並行処理」であり「並列処理」とは違うっぽい。知らな〜い。

*7:*3 で触れたように、他変数への拡張も容易です。 res[i,j] = returned_dict[i, j] とするだけ。returned_dict[i,j] = (n,m,l) のようにタプルを入れた場合も、res[i,j,0] = returned_dict[i, j][0] などして取り出せます。