py

看板Marginalman作者 (溫水佳樹的兄長大人)時間1年前 (2024/09/28 19:01), 1年前編輯推噓1(100)
留言1則, 1人參與, 1年前最新討論串11/16 (看更多)
利用多進程實現簡單的分散式運算 process可以分散到多台機器運行 multiprocessing的manager可以實現這功能 他把queue丟到網路上並被其他機器讀取 py: import random, os, time from multiprocessing import Queue from multiprocessing.managers import BaseManager task_queue = Queue() result_queue = Queue() def get_task_queue(): return task_queue def get_result_queue(): return result_queue class QueueManager(BaseManager): pass QueueManager.register('get_task_queue', callable=get_task_queue) QueueManager.register('get_result_queue', callable=get_result_queue) manager = QueueManager(address=('127.0.0.1', 5000), authkey=b'abc') print(os.getpid()) s = manager.get_server() s.serve_forever() 這邊先建立伺服器管理任務 這邊分成四個步驟: 1.建立queue queue負責進程間的交流 2.將創建的queue丟到網上,callable是能調用的對象,其他機器可以透過網路直接 調用這函數,然後透過函數取得queue 3.建立manager,設定端口與密碼,密碼需要二進位,所以前面有個b 4.啟動伺服器 運行這段程式碼 其他機器就能連進伺服器 py: import random, time, os from multiprocessing import Queue from multiprocessing.managers import BaseManager class QueueManager(BaseManager): pass QueueManager.register('get_task_queue') QueueManager.register('get_result_queue') server_addr = '127.0.0.1' m = QueueManager(address=(server_addr, 5000), authkey=b'abc') m.connect() task = m.get_task_queue() result = m.get_result_queue() for i in range(10): n = random.randint(0, 10000) print(f'put task {n}') task.put(n) print('少女祈禱中') for i in range(10): try: r = result.get(timeout=10) print(f'result {r}') except Exception as e: print(e) print(os.getpid()) 這段程式碼的功能是產生資料,存入任務queue,等待,讀取運算結果 這邊是負責執行任務的機器 首先使用QueueManager註冊要調用的函數 第二步 連結伺服器 第三步 取得queue 第四步 執行任務 py: import time, queue, os from multiprocessing.managers import BaseManager class QueueManager(BaseManager): pass QueueManager.register('get_task_queue') QueueManager.register('get_result_queue') server_addr = '127.0.0.1' m = QueueManager(address=(server_addr, 5000), authkey=b'abc') m.connect() task = m.get_task_queue() result = m.get_result_queue() for i in range(10): try: n = task.get(timeout=1) print(f'run task {n}') r = f'{n} * {n} = {n * n}' time.sleep(1) result.put(r) except queue.Empty: print('task queue is empty.') print(os.getpid() ) print('worker exit.') 這段程式碼的功能是對task queue裡面的數字平方 完成後放入 result queue 大致步驟跟上面一樣 不贅述 透過這三段程式碼 我們就完成了一個簡單的分散式架構 我印象Celery dask之類的好像更常用 但我沒修過分散式系統 就請其他大師說明 另外有錯的部分歡迎指正 感謝 程式碼參考下面兩個網站 不過直接照抄跑不起來 所以我有另外修改 參考資料: https://liaoxuefeng.com/books/python/process-thread/process-manager/index.html https://docs.python.org/3.10/library/multiprocessing.html -- ※ 發信站: 批踢踢實業坊(ptt.cc), 來自: 223.136.68.7 (臺灣) ※ 文章網址: https://www.ptt.cc/bbs/Marginalman/M.1727521308.A.380.html ※ 編輯: sustainer123 (223.136.68.7 臺灣), 09/28/2024 19:05:55

09/28 21:11, 1年前 , 1F
大師..
09/28 21:11, 1F
文章代碼(AID): #1cz-8SE0 (Marginalman)
討論串 (同標題文章)
完整討論串 (本文為第 11 之 16 篇):
1
5
9月前, 03/09
2
3
1年前, 11/16
0
3
1年前, 11/12
2
2
1年前, 10/02
6
13
1
1
1年前, 09/28
1
1
2
6
0
6
1年前, 09/26
1
7
文章代碼(AID): #1cz-8SE0 (Marginalman)