Juliaで並列処理

juliaではBase.Threadsモジュールが標準で提供されていて、簡単にマルチスレッド処理を実装することができます。 ここでは、次の2つのマクロを紹介します。

  • @threads
  • @spawn

@threads
for文を渡すと、マルチスレッド化してくれます。

using Base.Threads

n = 1000000
x = randn(n)
y = randn(n)
z = zeros(n)

# マルチスレッドで実行
@threads for i in 1:length(z)
    z[i] = x[i] * y[i]
end

マルチスレッドを利用するには実行時オプション-threads nで設定する必要があります。 実行可能なスレッド数を確認するにはThreads.nthreds()メソッドを呼び出すとスレッド数が返ります。


@spawn
マクロに続く式を実行するタスク*1を生成して、空きスレッドで実行します。

for i in 1:10
    task = @spawn some_work()
end

@spawnを付与した関数や式はtask(Task型)を返します。@spawnで生成したタスクは、非同期で実行されます。 タスク化したものが返り値を返す場合はfetch関数が便利です。fetchを使うと、タスクが完了するまで待機させることができます。 ただ、IOをブロックしてしまうので、ブロックさせたくない場合はBase.@asyncでそれ自体を別タスクで非同期実行する必要があります。


サンプルコード

複数の音声データをダウンロードする処理を、マルチスレッドかつマルチタスクで実行するコードを書いてみました。

# download.jl
using HTTP
using Base.Threads


# 指定のURLからファイルをダウンロードする
function download(url, timeout=180)
    filename = basename(url)
    try
        @info "threadid=$(threadid()): download start $(filename)"
        res = HTTP.get(url, readtimeout=timeout)
        @info "threadid=$(threadid()): download finished $(filename)"
        return (filename=filename, filecontent=res.body)
    catch e
        @error e
        return
    end
end


function main()
    urls = [
        "https://archive.org/download/ThePianoMusicOfMauriceRavel/01PavanePourUneInfanteDfuntePourPianoMr19.mp3",
        "https://archive.org/download/ThePianoMusicOfMauriceRavel/02JeuxDeauPourPianoMr30.mp3",
        "https://archive.org/download/ThePianoMusicOfMauriceRavel/03SonatinePourPianoMr40-Modr.mp3",
        "https://archive.org/download/ThePianoMusicOfMauriceRavel/04MouvementDeMenuet.mp3",
        "https://archive.org/download/ThePianoMusicOfMauriceRavel/05Anim.mp3",
    ]
    tasks = Task[]
    for url in urls
        # 逐次ダウンロードを開始する
        task = @spawn download(url)
        push!(tasks, task)
    end

    @sync for task in tasks
        @async begin
            result = fetch(task)
            result === nothing && ErrorException("download is failed.")
            path = joinpath(dirname(@__FILE__), "tmp", result.filename)
            try
                open(path, "w") do file
                    write(file, result.filecontent)
                end
            catch e
                error(e)
            end
        end
    end
end

main()
  • @__FILE__は実行ファイルパスを取得するマクロ
  • begin ~ endは複数の式を一つのブロック化するときに役立ちます。スコープは作らないので、ローカルスコープが欲しい場合はlet ~ endを使います。
  • @async ~begin~endブロックを非同期実行します。@syncで全体をラップすることで、@asyncで非同期実行された式がすべて完了するまでfor~endのスコープを抜けるのを待つことができます。

上記コードをスレッド数を指定して上記スクリプトを実行するには julia --threads 4 download.jl と実行すれば、マルチスレッド&マルチタスクでファイルダウンロードが実行されます. この例は、非同期実行すれば十分な内容ではあると思いますが、@spawnを使うと簡単に非同期かつマルチスレッドな実行が実現できるということがわかります。