Python并行化執(zhí)行詳細(xì)解析

    目錄

    前言:

    并行編程比程序編程困難,除非正常編程需要?jiǎng)?chuàng)建大量數(shù)據(jù),計(jì)算耗時(shí)太長,物理行為模擬困難

    例子:N體問題

    物理前提:

    • 牛頓定律
    • 時(shí)間離散運(yùn)動(dòng)方程

    普通計(jì)算方法

    import numpy as npimport timeimport matplotlib.pyplot as pltfrom mpl_toolkits.mplot3d import Axes3DNs = [2**i for i in range(1,10)]runtimes = []def remove_i(x,i):    "從所有粒子中去除本粒子"    shape = (x.shape[0]-1,)+x.shape[1:]    y = np.empty(shape,dtype=float)    y[:i] = x[:i]    y[i:] = x[i+1:]    return y def a(i,x,G,m):    "計(jì)算加速度"    x_i = x[i]    x_j = remove_i(x,i)    m_j = remove_i(m,i)    diff = x_j - x_i    mag3 = np.sum(diff**2,axis=1)**1.5    result = G * np.sum(diff * (m_j / mag3)[:,np.newaxis],axis=0)    return resultdef timestep(x0,v0,G,m,dt):    N = len(x0)    x1 = np.empty(x0.shape,dtype=float)    v1 = np.empty(v0.shape,dtype=float)    for i in range(N):        a_i0 = a(i,x0,G,m)        v1[i] = a_i0 * dt + v0[i]        x1[i] = a_i0 * dt**2 + v0[i] * dt + x0[i]    return x1,v1 def initial_cond(N,D):    x0 = np.array([[1,1,1],[10,10,10]])    v0 = np.array([[10,10,1],[0,0,0]])    m = np.array([10,10])    return x0,v0,mdef stimulate(N,D,S,G,dt):    fig = plt.figure()    ax = Axes3D(fig)    x0,v0,m = initial_cond(N,D)    for s in range(S):        x1,v1 = timestep(x0,v0,G,m,dt)        x0,v0 = x1,v1        t = 0        for i in x0:            ax.scatter(i[0],i[1],i[2],label=str(s*dt),c=["black","green","red"][t])            t += 1        t = 0    plt.show()start = time.time()stimulate(2,3,3000,9.8,1e-3)stop = time.time()runtimes.append(stop - start)

    效果圖

    Python 并行化執(zhí)行

    首先我們給出一個(gè)可以用來寫自己得并行化程序得,額,一串代碼

    import datetimeimport multiprocessing as mp def accessional_fun():    f = open("accession.txt","r")    result = float(f.read())    f.close()    return result def final_fun(name, param):    result = 0    for num in param:        result += num + accessional_fun() * 2    return {name: result}if __name__ == '__main__':    start_time = datetime.datetime.now()    num_cores = int(mp.cpu_count())    print("你使用得計(jì)算機(jī)有: " + str(num_cores) + " 個(gè)核,當(dāng)然了,Intel 7 以上得要除以2")    print("如果你使用得 Python 是 32 位得,注意數(shù)據(jù)量不要超過兩個(gè)G")    print("請(qǐng)你再次檢查你得程序是否已經(jīng)改成了適合并行運(yùn)算得樣子")    pool = mp.Pool(num_cores)    param_dict = {'task1': list(range(10, 300)),                  'task2': list(range(300, 600)),                  'task3': list(range(600, 900)),                  'task4': list(range(900, 1200)),                  'task5': list(range(1200, 1500)),                  'task6': list(range(1500, 1800)),                  'task7': list(range(1800, 2100)),                  'task8': list(range(2100, 2400)),                  'task9': list(range(2400, 2700)),                  'task10': list(range(2700, 3000))}    results = [pool.apply_async(final_fun, args=(name, param)) for name, param in param_dict.items()]    results = [p.get() for p in results]    end_time = datetime.datetime.now()    use_time = (end_time - start_time).total_seconds()    print("多進(jìn)程計(jì)算 共消耗: " + "{:.2f}".format(use_time) + " 秒")    print(results)

    運(yùn)行結(jié)果:如下:

    accession.txt 里得內(nèi)容是2.5     這就是一個(gè)累加得問題,每次累加得時(shí)候都會(huì)讀取文件中得2.5

    如果需要運(yùn)算得問題是類似于累加得問題,也就是可并行運(yùn)算得問題,那么才好做出并行運(yùn)算得改造

    再舉一個(gè)例子

    import mathimport timeimport multiprocessing as mpdef final_fun(name, param):    result = 0    for num in param:        result += math.cos(num) + math.sin(num)    return {name: result}if __name__ == '__main__':    start_time = time.time()    num_cores = int(mp.cpu_count())    print("你使用得計(jì)算機(jī)有: " + str(num_cores) + " 個(gè)核,當(dāng)然了,Intel 7 以上得要除以2")    print("如果你使用得 Python 是 32 位得,注意數(shù)據(jù)量不要超過兩個(gè)G")    print("請(qǐng)你再次檢查你得程序是否已經(jīng)改成了適合并行運(yùn)算得樣子")    pool = mp.Pool(num_cores)    param_dict = {'task1': list(range(10, 3000000)),                  'task2': list(range(3000000, 6000000)),                  'task3': list(range(6000000, 9000000)),                  'task4': list(range(9000000, 12000000)),                  'task5': list(range(12000000, 15000000)),                  'task6': list(range(15000000, 18000000)),                  'task7': list(range(18000000, 21000000)),                  'task8': list(range(21000000, 24000000)),                  'task9': list(range(24000000, 27000000)),                  'task10': list(range(27000000, 30000000))}    results = [pool.apply_async(final_fun, args=(name, param)) for name, param in param_dict.items()]    results = [p.get() for p in results]    end_time = time.time()    use_time = end_time - start_time    print("多進(jìn)程計(jì)算 共消耗: " + "{:.2f}".format(use_time) + " 秒")    result = 0    for i in range(0,10):        result += results[i].get("task"+str(i+1))    print(result)    start_time = time.time()    result = 0    for i in range(10,30000000):        result += math.cos(i) + math.sin(i)    end_time = time.time()    print("單進(jìn)程計(jì)算 共消耗: " + "{:.2f}".format(end_time - start_time) + " 秒")    print(result)

    運(yùn)行結(jié)果:

    力學(xué)問題改進(jìn):

    import numpy as npimport timefrom mpi4py import MPIfrom mpi4py.MPI import COMM_WORLDfrom types import FunctionTypefrom matplotlib import pyplot as pltfrom multiprocessing import Pooldef remove_i(x,i):    shape = (x.shape[0]-1,) + x.shape[1:]    y = np.empty(shape,dtype=float)    y[:1] = x[:1]    y[i:] = x[i+1:]    return ydef a(i,x,G,m):    x_i = x[i]    x_j = remove_i(x,i)    m_j = remove_i(m,i)    diff = x_j - x_i    mag3 = np.sum(diff**2,axis=1)**1.5    result = G * np.sum(diff * (m_j/mag3)[:,np.newaxis],axis=0)    return result def timestep(x0,v0,G,m,dt,pool):    N = len(x0)    takes = [(i,x0,v0,G,m,dt) for i in range(N)]    results = pool.map(timestep_i,takes)    x1 = np.empty(x0.shape,dtype=float)    v1 = np.empty(v0.shape,dtype=float)    for i,x_i1,v_i1 in results:        x1[i] = x_i1        v1[i] = v_i1    return x1,v1def timestep_i(args):    i,x0,v0,G,m,dt = args    a_i0 = a(i,x0,G,m)    v_i1 = a_i0 * dt + v0[i]    x_i1 = a_i0 * dt ** 2 +v0[i]*dt + x0[i]    return i,x_i1,v_i1def initial_cond(N,D):    x0 = np.random.rand(N,D)    v0 = np.zeros((N,D),dtype=float)    m = np.ones(N,dtype=float)    return x0,v0,mclass Pool(object):    def __init__(self):        self.f = None        self.P = COMM_WORLD.Get_size()        self.rank = COMM_WORLD.Get_rank()    def wait(self):        if self.rank == 0:            raise RuntimeError("Proc 0 cannot wait!")        status = MPI.Status()        while True:            task = COMM_WORLD.recv(source=0,tag=MPI.ANY_TAG,status=status)            if not task:                break            if isinstance(task,FunctionType):                self.f = task                continue            result = self.f(task)            COMM_WORLD.isend(result,dest=0,tag=status.tag)    def map(self,f,tasks):        N = len(tasks)        P = self.P        Pless1 = P - 1        if self.rank != 0:            self.wait()            return        if f is not self.f:            self.f = f            requests = []            for p in range(1,self.P):                r = COMM_WORLD.isend(f,dest=p)                requests.append(r)            MPI.Request.waitall(requests)            results = []            for i in range(N):                result = COMM_WORLD.recv(source=(i%Pless1)+1,tag=i)                results.append(result)            return results    def __del__(self):        if self.rank == 0:            for p in range(1,self.p):                COMM_WORLD.isend(False,dest=p)def simulate(N,D,S,G,dt):    x0,v0,m = initial_cond(N,D)    pool = Pool()    if COMM_WORLD.Get_rank()==0:        for s in range(S):            x1,v1 = timestep(x0,v0,G,m,dt,pool)            x0,v0 = x1,v1        else:            pool.wait()if __name__ == '__main__':    simulate(128,3,300,1.0,0.001)Ps = [1,2,4,8]runtimes = []for P in Ps:    start = time.time()    simulate(128,3,300,1.0,0.001)    stop = time.time()    runtimes.append(stop - start)print(runtimes)

    到此這篇關(guān)于Python 并行化執(zhí)行詳細(xì)解析得內(nèi)容就介紹到這了,更多相關(guān)Python 并行化執(zhí)行內(nèi)容請(qǐng)搜索之家以前得內(nèi)容或繼續(xù)瀏覽下面得相關(guān)內(nèi)容希望大家以后多多支持之家!

    聲明:所有內(nèi)容來自互聯(lián)網(wǎng)搜索結(jié)果,不保證100%準(zhǔn)確性,僅供參考。如若本站內(nèi)容侵犯了原著者的合法權(quán)益,可聯(lián)系我們進(jìn)行處理。
    發(fā)表評(píng)論
    更多 網(wǎng)友評(píng)論1 條評(píng)論)
    暫無評(píng)論

    返回頂部

    主站蜘蛛池模板: 波霸影院一区二区| 99偷拍视频精品一区二区| 精品无码国产一区二区三区51安| 亚洲一区二区精品视频| 国产一区二区福利| 亚洲日本一区二区一本一道| 无码精品人妻一区二区三区中| 久久久一区二区三区| 日本一区二区免费看| 本免费AV无码专区一区| 国内精品一区二区三区在线观看| 麻豆文化传媒精品一区二区| 亚洲中文字幕无码一区二区三区| 国产人妖视频一区二区破除| 亚洲av无码一区二区三区人妖| 国产精品一区二区久久精品涩爱| 久久精品国产第一区二区三区| 无码一区二区三区| 亚洲电影国产一区| 亚洲一区精品视频在线| 国产精品自拍一区| 色欲精品国产一区二区三区AV| 中文字幕在线一区| 久久精品国产一区| 日韩亚洲一区二区三区| 久久免费区一区二区三波多野| 亚洲AV综合色区无码一区爱AV| 秋霞鲁丝片一区二区三区| 国产综合无码一区二区色蜜蜜| 国产对白精品刺激一区二区| 亚洲AV无码一区二区二三区软件| 成人乱码一区二区三区av| 国产一区二区三区免费观在线| 久久se精品一区精品二区国产| 日韩福利视频一区| 国产精品va一区二区三区| 亚洲国产成人久久综合一区77| 国产品无码一区二区三区在线蜜桃| 国产在线无码一区二区三区视频| 亚洲国产老鸭窝一区二区三区| 亚洲AV无码一区二区三区网址|