Xử lý song song là một chế độ hoạt động trong đó tác vụ được thực hiện đồng thời trong nhiều bộ xử lý trong cùng một máy tính. Nó có nghĩa là để giảm thời gian xử lý tổng thể. Trong hướng dẫn này, bạn sẽ hiểu quy trình để song song hóa bất kỳ logic điển hình nào bằng mô -đun đa xử lý Python.
1. Giới thiệu
Xử lý song song là một chế độ hoạt động trong đó tác vụ được thực hiện đồng thời trong nhiều bộ xử lý trong cùng một máy tính. Nó có nghĩa là để giảm thời gian xử lý tổng thể.
Tuy nhiên, thường có một chút chi phí khi giao tiếp giữa các quá trình thực sự có thể tăng thời gian tổng thể được thực hiện cho các nhiệm vụ nhỏ thay vì giảm nó.
Trong Python, mô -đun
import numpy as np
from time import time
# Prepare data
np.random.RandomState[100]
arr = np.random.randint[0, 10, size=[200000, 5]]
data = arr.tolist[]
data[:5]
8 được sử dụng để chạy các quy trình song song độc lập bằng cách sử dụng các quy trình con [thay vì các luồng].Nó cho phép bạn tận dụng nhiều bộ xử lý trên máy [cả Windows và UNIX], có nghĩa là, các quy trình có thể được chạy ở các vị trí bộ nhớ hoàn toàn riêng biệt. Đến cuối hướng dẫn này, bạn sẽ biết:
- Làm thế nào để cấu trúc mã và hiểu cú pháp để cho phép xử lý song song bằng cách sử dụng
8?import numpy as np from time import time # Prepare data np.random.RandomState[100] arr = np.random.randint[0, 10, size=[200000, 5]] data = arr.tolist[] data[:5]
- Làm thế nào để thực hiện xử lý song song đồng bộ và không đồng bộ?
- Làm thế nào để song song hóa một gấu trúc DataFrame?
- Giải quyết 3 lần sử dụng khác nhau với giao diện
5 [được viết bên dưới] để kiểm tra xem có bao nhiêu số nằm trong phạm vi và trả về số lượng.# Solution Without Paralleization def howmany_within_range[row, minimum, maximum]: """Returns how many numbers lie within `maximum` and `minimum` in a given `row`""" count = 0 for n in row: if minimum [3, 1, 4, 4, 4, 2, 1, 1, 3, 3]
Nhận khóa học Python hoàn thành miễn phí
Đối mặt với tình huống tương tự như mọi người khác?
Xây dựng sự nghiệp khoa học dữ liệu của bạn với trình độ được công nhận trên toàn cầu, được công nghiệp phê duyệt. Có được suy nghĩ, sự tự tin và các kỹ năng làm cho nhà khoa học dữ liệu trở nên có giá trị.
Nhận khóa học Python hoàn thành miễn phí
Xây dựng sự nghiệp khoa học dữ liệu của bạn với trình độ được công nhận trên toàn cầu, được công nghiệp phê duyệt. Có được suy nghĩ, sự tự tin và các kỹ năng làm cho nhà khoa học dữ liệu trở nên có giá trị.
9 và# Solution Without Paralleization def howmany_within_range[row, minimum, maximum]: """Returns how many numbers lie within `maximum` and `minimum` in a given `row`""" count = 0 for n in row: if minimum [3, 1, 4, 4, 4, 2, 1, 1, 3, 3]
5 với# Parallelizing using Pool.map[] import multiprocessing as mp # Redefine, with only 1 mandatory argument. def howmany_within_range_rowonly[row, minimum=4, maximum=8]: count = 0 for n in row: if minimum [3, 1, 4, 4, 4, 2, 1, 1, 3, 3]
9 và# Parallelizing using Pool.apply[] import multiprocessing as mp # Step 1: Init multiprocessing.Pool[] pool = mp.Pool[mp.cpu_count[]] # Step 2: `pool.apply` the `howmany_within_range[]` results = [pool.apply[howmany_within_range, args=[row, 4, 8]] for row in data] # Step 3: Don't forget to close pool.close[] print[results[:10]] #> [3, 1, 4, 4, 4, 2, 1, 1, 3, 3]
# Parallelizing using Pool.map[] import multiprocessing as mp # Redefine, with only 1 mandatory argument. def howmany_within_range_rowonly[row, minimum=4, maximum=8]: count = 0 for n in row: if minimum [3, 1, 4, 4, 4, 2, 1, 1, 3, 3]
5.2. Song song bằng cách sử dụng pool.map []
7 bằng cách đặt mặc định thành các tham số# Solution Without Paralleization def howmany_within_range[row, minimum, maximum]: """Returns how many numbers lie within `maximum` and `minimum` in a given `row`""" count = 0 for n in row: if minimum [3, 1, 4, 4, 4, 2, 1, 1, 3, 3]
8 và# Parallelizing with Pool.starmap[] import multiprocessing as mp pool = mp.Pool[mp.cpu_count[]] results = pool.starmap[howmany_within_range, [[row, 4, 8] for row in data]] pool.close[] print[results[:10]] #> [3, 1, 4, 4, 4, 2, 1, 1, 3, 3]
9 để tạo hàm# Parallelizing with Pool.starmap[] import multiprocessing as mp pool = mp.Pool[mp.cpu_count[]] results = pool.starmap[howmany_within_range, [[row, 4, 8] for row in data]] pool.close[] print[results[:10]] #> [3, 1, 4, 4, 4, 2, 1, 1, 3, 3]
9 ngoại trừ việc bạn cần cung cấp chức năng gọi lại cho biết các kết quả được tính toán nên được lưu trữ như thế nào.# Parallel processing with Pool.apply_async[] import multiprocessing as mp pool = mp.Pool[mp.cpu_count[]] results = [] # Step 1: Redefine, to accept `i`, the iteration number def howmany_within_range2[i, row, minimum, maximum]: """Returns how many numbers lie within `maximum` and `minimum` in a given `row`""" count = 0 for n in row: if minimum [3, 1, 4, 4, 4, 2, 1, 1, 3, 3]
Tuy nhiên, một cảnh báo với
0 là, thứ tự các số trong kết quả bị xáo trộn cho thấy các quy trình không hoàn thành theo thứ tự nó được bắt đầu.# Parallel processing with Pool.apply_async[] without callback function import multiprocessing as mp pool = mp.Pool[mp.cpu_count[]] results = [] # call apply_async[] without callback result_objects = [pool.apply_async[howmany_within_range2, args=[i, row, 4, 8]] for i, row in enumerate[data]] # result_objects is a list of pool.ApplyResult objects results = [r.get[][1] for r in result_objects] pool.close[] pool.join[] print[results[:10]] #> [3, 1, 4, 4, 4, 2, 1, 1, 3, 3]
Một cách giải quyết cho điều này là, chúng tôi xác định lại một
6 mới để chấp nhận và trả về số lặp [# Parallel processing with Pool.apply_async[] without callback function import multiprocessing as mp pool = mp.Pool[mp.cpu_count[]] results = [] # call apply_async[] without callback result_objects = [pool.apply_async[howmany_within_range2, args=[i, row, 4, 8]] for i, row in enumerate[data]] # result_objects is a list of pool.ApplyResult objects results = [r.get[][1] for r in result_objects] pool.close[] pool.join[] print[results[:10]] #> [3, 1, 4, 4, 4, 2, 1, 1, 3, 3]
7] và sau đó sắp xếp các kết quả cuối cùng.# Parallel processing with Pool.apply_async[] without callback function import multiprocessing as mp pool = mp.Pool[mp.cpu_count[]] results = [] # call apply_async[] without callback result_objects = [pool.apply_async[howmany_within_range2, args=[i, row, 4, 8]] for i, row in enumerate[data]] # result_objects is a list of pool.ApplyResult objects results = [r.get[][1] for r in result_objects] pool.close[] pool.join[] print[results[:10]] #> [3, 1, 4, 4, 4, 2, 1, 1, 3, 3]
0 mà không cung cấp chức năng# Parallel processing with Pool.apply_async[] import multiprocessing as mp pool = mp.Pool[mp.cpu_count[]] results = [] # Step 1: Redefine, to accept `i`, the iteration number def howmany_within_range2[i, row, minimum, maximum]: """Returns how many numbers lie within `maximum` and `minimum` in a given `row`""" count = 0 for n in row: if minimum [3, 1, 4, 4, 4, 2, 1, 1, 3, 3]
9.# Parallel processing with Pool.apply_async[] without callback function import multiprocessing as mp pool = mp.Pool[mp.cpu_count[]] results = [] # call apply_async[] without callback result_objects = [pool.apply_async[howmany_within_range2, args=[i, row, 4, 8]] for i, row in enumerate[data]] # result_objects is a list of pool.ApplyResult objects results = [r.get[][1] for r in result_objects] pool.close[] pool.join[] print[results[:10]] #> [3, 1, 4, 4, 4, 2, 1, 1, 3, 3]
Chỉ có vậy, nếu bạn không cung cấp một cuộc gọi lại, thì bạn sẽ nhận được một danh sách các đối tượng
0 chứa các giá trị đầu ra được tính toán từ mỗi quy trình.# Parallelizing with Pool.starmap_async[] import multiprocessing as mp pool = mp.Pool[mp.cpu_count[]] results = [] results = pool.starmap_async[howmany_within_range2, [[i, row, 4, 8] for i, row in enumerate[data]]].get[] # With map, use `howmany_within_range_rowonly` instead # results = pool.map_async[howmany_within_range_rowonly, [row for row in data]].get[] pool.close[] print[results[:10]] #> [3, 1, 4, 4, 4, 2, 1, 1, 3, 3]
Từ đó, bạn cần sử dụng phương thức
1 để lấy kết quả cuối cùng mong muốn.# Parallelizing with Pool.starmap_async[] import multiprocessing as mp pool = mp.Pool[mp.cpu_count[]] results = [] results = pool.starmap_async[howmany_within_range2, [[i, row, 4, 8] for i, row in enumerate[data]]].get[] # With map, use `howmany_within_range_rowonly` instead # results = pool.map_async[howmany_within_range_rowonly, [row for row in data]].get[] pool.close[] print[results[:10]] #> [3, 1, 4, 4, 4, 2, 1, 1, 3, 3]
# Parallel processing with Pool.apply_async[] without callback function import multiprocessing as mp pool = mp.Pool[mp.cpu_count[]] results = [] # call apply_async[] without callback result_objects = [pool.apply_async[howmany_within_range2, args=[i, row, 4, 8]] for i, row in enumerate[data]] # result_objects is a list of pool.ApplyResult objects results = [r.get[][1] for r in result_objects] pool.close[] pool.join[] print[results[:10]] #> [3, 1, 4, 4, 4, 2, 1, 1, 3, 3]
6.2 song song với pool.starmap_async []
Bạn đã thấy cách
0 hoạt động.# Parallel processing with Pool.apply_async[] without callback function import multiprocessing as mp pool = mp.Pool[mp.cpu_count[]] results = [] # call apply_async[] without callback result_objects = [pool.apply_async[howmany_within_range2, args=[i, row, 4, 8]] for i, row in enumerate[data]] # result_objects is a list of pool.ApplyResult objects results = [r.get[][1] for r in result_objects] pool.close[] pool.join[] print[results[:10]] #> [3, 1, 4, 4, 4, 2, 1, 1, 3, 3]
Bạn có thể tưởng tượng và viết lên một phiên bản tương đương cho
3 và# Parallelizing with Pool.starmap_async[] import multiprocessing as mp pool = mp.Pool[mp.cpu_count[]] results = [] results = pool.starmap_async[howmany_within_range2, [[i, row, 4, 8] for i, row in enumerate[data]]].get[] # With map, use `howmany_within_range_rowonly` instead # results = pool.map_async[howmany_within_range_rowonly, [row for row in data]].get[] pool.close[] print[results[:10]] #> [3, 1, 4, 4, 4, 2, 1, 1, 3, 3]
4 không?# Parallelizing with Pool.starmap_async[] import multiprocessing as mp pool = mp.Pool[mp.cpu_count[]] results = [] results = pool.starmap_async[howmany_within_range2, [[i, row, 4, 8] for i, row in enumerate[data]]].get[] # With map, use `howmany_within_range_rowonly` instead # results = pool.map_async[howmany_within_range_rowonly, [row for row in data]].get[] pool.close[] print[results[:10]] #> [3, 1, 4, 4, 4, 2, 1, 1, 3, 3]
Việc thực hiện ở dưới dù sao.
# Parallelizing with Pool.starmap_async[] import multiprocessing as mp pool = mp.Pool[mp.cpu_count[]] results = [] results = pool.starmap_async[howmany_within_range2, [[i, row, 4, 8] for i, row in enumerate[data]]].get[] # With map, use `howmany_within_range_rowonly` instead # results = pool.map_async[howmany_within_range_rowonly, [row for row in data]].get[] pool.close[] print[results[:10]] #> [3, 1, 4, 4, 4, 2, 1, 1, 3, 3]
7. Làm thế nào để song song hóa một bản dữ liệu gấu trúc?
Cho đến nay, bạn đã thấy cách song song hóa một chức năng bằng cách làm cho nó hoạt động trong danh sách.
Nhưng khi làm việc trong phân tích dữ liệu hoặc các dự án học máy, bạn có thể muốn song song hóa các khung dữ liệu gấu trúc, là các đối tượng được sử dụng phổ biến nhất [bên cạnh các mảng Numpy] để lưu trữ dữ liệu bảng.
Khi nói đến song song với
5, bạn có thể thực hiện các chức năng song song để lấy làm tham số đầu vào:# Parallelizing with Pool.starmap_async[] import multiprocessing as mp pool = mp.Pool[mp.cpu_count[]] results = [] results = pool.starmap_async[howmany_within_range2, [[i, row, 4, 8] for i, row in enumerate[data]]].get[] # With map, use `howmany_within_range_rowonly` instead # results = pool.map_async[howmany_within_range_rowonly, [row for row in data]].get[] pool.close[] print[results[:10]] #> [3, 1, 4, 4, 4, 2, 1, 1, 3, 3]
- Một hàng của DataFrame
- Một cột của DataFrame
- toàn bộ khung dữ liệu
2 đầu tiên có thể được thực hiện bằng cách sử dụng mô -đun
8.import numpy as np from time import time # Prepare data np.random.RandomState[100] arr = np.random.randint[0, 10, size=[200000, 5]] data = arr.tolist[] data[:5]
Nhưng đối với cái cuối cùng, đó là song song trên toàn bộ khung dữ liệu, chúng tôi sẽ sử dụng gói
7 sử dụng# Parallelizing with Pool.starmap_async[] import multiprocessing as mp pool = mp.Pool[mp.cpu_count[]] results = [] results = pool.starmap_async[howmany_within_range2, [[i, row, 4, 8] for i, row in enumerate[data]]].get[] # With map, use `howmany_within_range_rowonly` instead # results = pool.map_async[howmany_within_range_rowonly, [row for row in data]].get[] pool.close[] print[results[:10]] #> [3, 1, 4, 4, 4, 2, 1, 1, 3, 3]
8 để tuần tự hóa trong nội bộ.# Parallelizing with Pool.starmap_async[] import multiprocessing as mp pool = mp.Pool[mp.cpu_count[]] results = [] results = pool.starmap_async[howmany_within_range2, [[i, row, 4, 8] for i, row in enumerate[data]]].get[] # With map, use `howmany_within_range_rowonly` instead # results = pool.map_async[howmany_within_range_rowonly, [row for row in data]].get[] pool.close[] print[results[:10]] #> [3, 1, 4, 4, 4, 2, 1, 1, 3, 3]
Đầu tiên, hãy tạo một DataFrame mẫu và xem cách thực hiện các giao dịch thông minh và khôn ngoan về cột.
Một cái gì đó như sử dụng
9 trên hàm do người dùng xác định nhưng song song.# Parallelizing with Pool.starmap_async[] import multiprocessing as mp pool = mp.Pool[mp.cpu_count[]] results = [] results = pool.starmap_async[howmany_within_range2, [[i, row, 4, 8] for i, row in enumerate[data]]].get[] # With map, use `howmany_within_range_rowonly` instead # results = pool.map_async[howmany_within_range_rowonly, [row for row in data]].get[] pool.close[] print[results[:10]] #> [3, 1, 4, 4, 4, 2, 1, 1, 3, 3]
import numpy as np import pandas as pd import multiprocessing as mp df = pd.DataFrame[np.random.randint[3, 10, size=[5, 2]]] print[df.head[]] #> 0 1 #> 0 8 5 #> 1 5 3 #> 2 3 4 #> 3 4 4 #> 4 7 9
Chúng tôi có một khung dữ liệu. Hãy để áp dụng chức năng
0 trên mỗi hàng, nhưng chạy 4 quy trình cùng một lúc.import numpy as np import pandas as pd import multiprocessing as mp df = pd.DataFrame[np.random.randint[3, 10, size=[5, 2]]] print[df.head[]] #> 0 1 #> 0 8 5 #> 1 5 3 #> 2 3 4 #> 3 4 4 #> 4 7 9
Để làm điều này, chúng tôi khai thác
1.import numpy as np import pandas as pd import multiprocessing as mp df = pd.DataFrame[np.random.randint[3, 10, size=[5, 2]]] print[df.head[]] #> 0 1 #> 0 8 5 #> 1 5 3 #> 2 3 4 #> 3 4 4 #> 4 7 9
Bằng cách đặt
2, bạn đang chuyển từng hàng của DataFrame dưới dạng một bộ đơn giản cho hàmimport numpy as np import pandas as pd import multiprocessing as mp df = pd.DataFrame[np.random.randint[3, 10, size=[5, 2]]] print[df.head[]] #> 0 1 #> 0 8 5 #> 1 5 3 #> 2 3 4 #> 3 4 4 #> 4 7 9
0.import numpy as np import pandas as pd import multiprocessing as mp df = pd.DataFrame[np.random.randint[3, 10, size=[5, 2]]] print[df.head[]] #> 0 1 #> 0 8 5 #> 1 5 3 #> 2 3 4 #> 3 4 4 #> 4 7 9
0import numpy as np from time import time # Prepare data np.random.RandomState[100] arr = np.random.randint[0, 10, size=[200000, 5]] data = arr.tolist[] data[:5]
Đó là một ví dụ về song song hàng ngày.
Hãy để Lừa cũng làm một song song hóa cột.
Đối với điều này, tôi sử dụng
4 để chuyển toàn bộ cột làm chuỗi cho hàmimport numpy as np import pandas as pd import multiprocessing as mp df = pd.DataFrame[np.random.randint[3, 10, size=[5, 2]]] print[df.head[]] #> 0 1 #> 0 8 5 #> 1 5 3 #> 2 3 4 #> 3 4 4 #> 4 7 9
5.import numpy as np import pandas as pd import multiprocessing as mp df = pd.DataFrame[np.random.randint[3, 10, size=[5, 2]]] print[df.head[]] #> 0 1 #> 0 8 5 #> 1 5 3 #> 2 3 4 #> 3 4 4 #> 4 7 9
1import numpy as np from time import time # Prepare data np.random.RandomState[100] arr = np.random.randint[0, 10, size=[200000, 5]] data = arr.tolist[] data[:5]
Bây giờ đến phần thứ ba - song song hóa một hàm chấp nhận dữ liệu gấu trúc, mảng numpy, v.v ... Pathos theo kiểu
8 của: POOL> MAP> Đóng> Tham gia> Xóa.import numpy as np from time import time # Prepare data np.random.RandomState[100] arr = np.random.randint[0, 10, size=[200000, 5]] data = arr.tolist[] data[:5]
Kiểm tra các tài liệu Pathos để biết thêm thông tin.
2import numpy as np from time import time # Prepare data np.random.RandomState[100] arr = np.random.randint[0, 10, size=[200000, 5]] data = arr.tolist[] data[:5]
Cảm ơn NotSoprocoder vì sự đóng góp này dựa trên Pathos.
Nếu bạn đã quen thuộc với các khung dữ liệu gấu trúc nhưng muốn thực hành và làm chủ nó, hãy xem các bài tập gấu trúc này.
8. Bài tập
Bài 1: Sử dụng
8 và# Solution Without Paralleization def howmany_within_range[row, minimum, maximum]: """Returns how many numbers lie within `maximum` and `minimum` in a given `row`""" count = 0 for n in row: if minimum 0 1 #> 0 8 5 #> 1 5 3 #> 2 3 4 #> 3 4 4 #> 4 7 9
9. Useimport numpy as np import pandas as pd import multiprocessing as mp df = pd.DataFrame[np.random.randint[3, 10, size=[5, 2]]] print[df.head[]] #> 0 1 #> 0 8 5 #> 1 5 3 #> 2 3 4 #> 3 4 4 #> 4 7 9
8 and# Solution Without Paralleization def howmany_within_range[row, minimum, maximum]: """Returns how many numbers lie within `maximum` and `minimum` in a given `row`""" count = 0 for n in row: if minimum 0 1 #> 0 8 5 #> 1 5 3 #> 2 3 4 #> 3 4 4 #> 4 7 9
9.import numpy as np import pandas as pd import multiprocessing as mp df = pd.DataFrame[np.random.randint[3, 10, size=[5, 2]]] print[df.head[]] #> 0 1 #> 0 8 5 #> 1 5 3 #> 2 3 4 #> 3 4 4 #> 4 7 9
3import numpy as np from time import time # Prepare data np.random.RandomState[100] arr = np.random.randint[0, 10, size=[200000, 5]] data = arr.tolist[] data[:5]
Hiển thị giải pháp
4import numpy as np from time import time # Prepare data np.random.RandomState[100] arr = np.random.randint[0, 10, size=[200000, 5]] data = arr.tolist[] data[:5]
Bài 2: Sử dụng
# Solution Without Paralleization def howmany_within_range[row, minimum, maximum]: """Returns how many numbers lie within `maximum` and `minimum` in a given `row`""" count = 0 for n in row: if minimum