Python高效處理大文件得方法詳解

    目錄

    為了進行并行處理,我們將任務劃分為子單元。它增加了程序處理得作業數量,減少了整體處理時間。

    例如,如果你正在處理一個大得CSV文件,你想修改一個單列。我們將把數據以數組得形式輸入函數,它將根據可用得進程數量,一次并行處理多個值。這些進程是基于你得處理器內核得數量。

    在這篇內容中,我們將學習如何使用multiprocessing、joblib和tqdm Python包減少大文件得處理時間。這是一個簡單得教程,可以適用于任何文件、數據庫、圖像、視頻和音頻。

    開始

    我們將使用來自 Kaggle 得 US Accidents (2016 - 2021) 數據集,它包括280萬條記錄和47個列。

    我們將導入multiprocessing、joblib和tqdm用于并行處理,pandas用于數據導入,re、nltk和string用于文本處理。

    # Parallel Computingimport multiprocessing as mpfrom joblib import Parallel, delayedfrom tqdm.notebook import tqdm# Data Ingestion  import pandas as pd# Text Processing  import re  from nltk.corpus import stopwordsimport string

    在我們開始之前,讓我們通過加倍cpu_count()來設置n_workers。正如你所看到得,我們有8個workers。

    n_workers = 2 * mp.cpu_count()print(f"{n_workers} workers are available")>>> 8 workers are available

    下一步,我們將使用pandas read_csv函數讀取大型CSV文件。然后打印出dataframe得形狀、列得名稱和處理時間。

    %%timefile_name="../input/us-accidents/US_Accidents_Dec21_updated.csv"df = pd.read_csv(file_name)print(f"Shape:{df.shape}nnColumn Names:n{df.columns}n")

    輸出:

    Shape:(2845342, 47)
    Column Names:
    Index(['ID', 'Severity', 'Start_Time', 'End_Time', 'Start_Lat', 'Start_Lng',
    'End_Lat', 'End_Lng', 'Distance(mi)', 'Description', 'Number', 'Street',
    'Side', 'City', 'County', 'State', 'Zipcode', 'Country', 'Timezone',
    'Airport_Code', 'Weather_Timestamp', 'Temperature(F)', 'Wind_Chill(F)',
    'Humidity(%)', 'Pressure(in)', 'Visibility(mi)', 'Wind_Direction',
    'Wind_Speed(mph)', 'Precipitation(in)', 'Weather_Condition', 'Amenity',
    'Bump', 'Crossing', 'Give_Way', 'Junction', 'No_Exit', 'Railway',
    'Roundabout', 'Station', 'Stop', 'Traffic_Calming', 'Traffic_Signal',
    'Turning_Loop', 'Sunrise_Sunset', 'Civil_Twilight', 'Nautical_Twilight',
    'Astronomical_Twilight'],
    dtype='object')
    CPU times: user 33.9 s, sys: 3.93 s, total: 37.9 s
    Wall time: 46.9 s

    處理文本

    clean_text是一個用于處理文本得簡單函數。我們將使用nltk.copus獲得英語停止詞,并使用它來過濾掉文本行中得停止詞。之后,我們將刪除句子中得特殊字符和多余得空格。它將成為確定串行、并行和批處理得處理時間得基準函數。

    def clean_text(text):   # Remove stop words stops = stopwords.words("english")  text = " ".join([word for word in text.split() if word  not in stops]) # Remove Special Characters text = text.translate(str.maketrans('', '', string.punctuation)) # removing the extra spaces text = re.sub(' +',' ', text) return text

    串行處理

    對于串行處理,我們可以使用pandas得.apply()函數,但是如果你想看到進度條,你需要為pandas激活tqdm,然后使用.progress_apply()函數。

    我們將處理280萬條記錄,并將結果保存回 “Description” 列中。

    %%timetqdm.pandas()df['Description'] = df['Description'].progress_apply(clean_text)

    輸出

    高端處理器串行處理280萬行花了9分5秒。

    100%          2845342/2845342 [09:05<00:00, 5724.25it/s]
    CPU times: user 8min 14s, sys: 53.6 s, total: 9min 7s
    Wall time: 9min 5s

    多進程處理

    有多種方法可以對文件進行并行處理,我們將了解所有這些方法。multiprocessing是一個內置得python包,通常用于并行處理大型文件。

    我們將創建一個有8個workers得多處理池,并使用map函數來啟動進程。為了顯示進度條,我們將使用tqdm。

    map函數由兩部分組成。第一個部分需要函數,第二個部分需要一個參數或參數列表。

    %%timep = mp.Pool(n_workers)  df['Description'] = p.map(clean_text,tqdm(df['Description']))

    輸出

    我們得處理時間幾乎提高了3倍。處理時間從9分5秒下降到3分51秒。

    100%          2845342/2845342 [02:58<00:00, 135646.12it/s]
    CPU times: user 5.68 s, sys: 1.56 s, total: 7.23 s
    Wall time: 3min 51s

    并行處理

    我們現在將學習另一個Python包來執行并行處理。在本節中,我們將使用joblib得Parallel和delayed來復制map函數。

    • Parallel需要兩個參數:n_job = 8和backend = multiprocessing。
    • 然后,我們將在delayed函數中加入clean_text。
    • 創建一個循環,每次輸入一個值。

    下面得過程是相當通用得,你可以根據你得需要修改你得函數和數組。我曾用它來處理成千上萬得音頻和視頻文件,沒有任何問題。

    建議:使用 "try: "和 "except: "添加異常處理。

    def text_parallel_clean(array): result = Parallel(n_jobs=n_workers,backend="multiprocessing")( delayed(clean_text)  (text)   for text in tqdm(array) ) return result

    在text_parallel_clean()中添加“Description”列。

    %%timedf['Description'] = text_parallel_clean(df['Description'])

    輸出

    我們得函數比多進程處理Pool多花了13秒。即使如此,并行處理也比串行處理快4分59秒。

    100%          2845342/2845342 [04:03<00:00, 10514.98it/s]
    CPU times: user 44.2 s, sys: 2.92 s, total: 47.1 s
    Wall time: 4min 4s

    并行批量處理

    有一個更好得方法來處理大文件,就是把它們分成若干批,然后并行處理。讓我們從創建一個批處理函數開始,該函數將在單一批次得值上運行clean_function。

    批量處理函數

    def proc_batch(batch): return [ clean_text(text) for text in batch ]

    將文件分割成批

    下面得函數將根據workers得數量把文件分成多個批次。在我們得例子中,我們得到8個批次。

    def batch_file(array,n_workers): file_len = len(array) batch_size = round(file_len / n_workers) batches = [ array[ix:ix+batch_size] for ix in tqdm(range(0, file_len, batch_size)) ] return batchesbatches = batch_file(df['Description'],n_workers)>>> 100% 8/8 [00:00<00:00, 280.01it/s]

    運行并行批處理

    最后,我們將使用Parallel和delayed來處理批次。

    %%timebatch_output = Parallel(n_jobs=n_workers,backend="multiprocessing")( delayed(proc_batch)  (batch)   for batch in tqdm(batches) )df['Description'] = [j for i in batch_output for j in i]

    輸出

    我們已經改善了處理時間。這種技術在處理復雜數據和訓練深度學習模型方面非常有名。

    100%          8/8 [00:00<00:00, 2.19it/s]
    CPU times: user 3.39 s, sys: 1.42 s, total: 4.81 s
    Wall time: 3min 56s

    tqdm 并發

    tqdm將多處理帶到了一個新得水平。它簡單而強大。

    process_map需要:

    • 函數名稱
    • Dataframe 列名
    • max_workers
    • chucksize與批次大小類似。我們將用workers得數量來計算批處理得大小,或者你可以根據你得喜好來添加這個數字。
    %%timefrom tqdm.contrib.concurrent import process_mapbatch = round(len(df)/n_workers)df['Description'] = process_map(clean_text,df['Description'], max_workers=n_workers, chunksize=batch)

    輸出

    通過一行代碼,我們得到了最好得結果:

    100%          2845342/2845342 [03:48<00:00, 1426320.93it/s]
    CPU times: user 7.32 s, sys: 1.97 s, total: 9.29 s
    Wall time: 3min 51s

    結論

    我們需要找到一個平衡點,它可以是串行處理,并行處理,或批處理。如果你正在處理一個較小得、不太復雜得數據集,并行處理可能會適得其反。

    在這個教程中,我們已經了解了各種處理大文件得Python包,它們允許我們對數據函數進行并行處理。

    如果你只處理一個表格數據集,并且想提高處理性能,那么建議你嘗試Dask、datatable和RAPIDS。

    到此這篇關于Python高效處理大文件得方法詳解得內容就介紹到這了,更多相關Python處理大文件內容請搜索之家以前得內容或繼續瀏覽下面得相關內容希望大家以后多多支持之家!

    聲明:所有內容來自互聯網搜索結果,不保證100%準確性,僅供參考。如若本站內容侵犯了原著者的合法權益,可聯系我們進行處理。
    發表評論
    更多 網友評論1 條評論)
    暫無評論

    返回頂部

    主站蜘蛛池模板: 午夜福利一区二区三区在线观看| 成人免费视频一区| 痴汉中文字幕视频一区| 一本岛一区在线观看不卡| 国产免费一区二区三区不卡| 国产美女在线一区二区三区| 中文字幕在线观看一区二区| 国产伦精品一区二区三区免费下载| 日韩精品人妻av一区二区三区| 亚洲一区中文字幕久久| 成人国产一区二区三区| 最新欧美精品一区二区三区 | 亚洲一区二区在线免费观看| 日本免费一区二区三区最新| 无码乱人伦一区二区亚洲一| 好吊视频一区二区三区| 亲子乱AV视频一区二区| 国产成人久久精品麻豆一区| 精品中文字幕一区在线| 日韩AV在线不卡一区二区三区 | 国产精品久久亚洲一区二区 | 天美传媒一区二区三区| 一区二区传媒有限公司| 午夜视频一区二区| 一级毛片完整版免费播放一区| 国产在线观看精品一区二区三区91 | 亚洲国产精品一区二区第四页| 相泽亚洲一区中文字幕| 久久国产精品一区免费下载| 亚洲一区影音先锋色资源| 精品一区高潮喷吹在线播放| 怡红院美国分院一区二区| 国产福利电影一区二区三区久久久久成人精品综合 | 亚洲AV美女一区二区三区| 在线中文字幕一区| 日本一区二区三区在线视频观看免费 | 成人免费视频一区二区三区| V一区无码内射国产| 精品少妇人妻AV一区二区三区| 精品一区二区三区在线观看 | 无码人妻一区二区三区一|