* Producer & Consumer ( 생산자 & 소비자) 에 문제를 Python3.6 으로 작성해 보았다.
Asyncio (비동기) 작성되었으며, Message queue 인 ZeroMQ 를 사용하여 Producer 이하 (p) 가 Consumer 이하 (c) 에게 원하는 Message (broadcasting) 를 전달 후 각 c 들이 data 를 가져와 처리하는 예제 code 이다.
# 실행 ( pub )
import asyncio
import sys, signal
import zmq
import zmq.asyncio
kill = False
def bind_zmq(port):
print('try bind localhost %d' % port)
context = zmq.asyncio.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://127.0.0.1:%s" % port)
return socket
async def pub_func():
socket = bind_zmq(10010)
count = 1
await asyncio.sleep(1)
while True:
data = "'message %d'" % count
print('send_msg %s' % data)
await socket.send_string(data)
count = count + 1
if kill: break
await asyncio.sleep(0.1)
socket.close()
async def async_task():
fts = [asyncio.ensure_future(pub_func())]
for f in asyncio.as_completed(fts):
await f
def signal_handler(signal, frame):
print('signal num %d' % signal)
global kill
kill = True
if __name__ == "__main__":
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
loop = asyncio.get_event_loop()
loop.run_until_complete(async_task())
loop.close()
# 실행 ( sub )
import asyncio
import sys, signal
import zmq
import zmq.asyncio
import random
def connect_zmq(port):
context = zmq.asyncio.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://127.0.0.1:%s" % port)
socket.setsockopt_string(zmq.SUBSCRIBE, '')
# socket.setsockopt(zmq.SUBSCRIBE, b'')
return socket
async def sub_func(que):
print('call sub_func')
socket = connect_zmq(10010)
while True:
sub_msg = await socket.recv_string()
# print('sub_msg %s in sub_func' % sub_msg)
qs = que.qsize()
if qs >= 90:
print('warning que size over 90')
if qs >= 98:
print('dont inque')
continue
que.put_nowait(sub_msg)
socket.close()
async def proc_func(wk_id, que):
print('call wk_id %d proc_func' % wk_id)
while True:
que_data = await que.get()
slp_tm = random.uniform(0.3, 1.0)
print('wk_id %d, que_data %s, slp_tm %f in proc_func' % (wk_id, que_data, slp_tm))
await asyncio.sleep(slp_tm)
async def async_tasks():
que = asyncio.Queue(maxsize = 100)
# fts = [asyncio.ensure_future(sub_func(que)),
fts = [asyncio.ensure_future(proc_func(wk_id, que))
for wk_id in range(1,4)
]
fts.append(asyncio.ensure_future(sub_func(que)))
for f in asyncio.as_completed(fts):
await f
await que.join()
def signal_handler(signal, frame):
print('signal num %d' % signal)
sys.exit(1)
if __name__ == "__main__":
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
loop = asyncio.get_event_loop()
loop.run_until_complete(async_tasks())
loop.close()
code 는 위와 같다. local address 의 10010 port 를 사용하며 pub code 가 message 값을 1씩 증가시키면서 publish 를 하게 되면 sub code 는 n개의 ensure_future object 로 publish 된 data 를 subscribe 하여 data 를 처리하는 방식이다. n번째의 ensure_future 객체 는 random 값으로 asyncio.sleep 를 각기 다르게 주기 때문에 어떤 객체부터 끝나는지 알수 없게 된다.
아래는 코드의 결과 이다.
# Pub
send_msg 'message 87'
send_msg 'message 88'
send_msg 'message 89'
send_msg 'message 90'
send_msg 'message 91'
send_msg 'message 92'
send_msg 'message 93'
send_msg 'message 94'
send_msg 'message 95'
send_msg 'message 96'
send_msg 'message 97'
send_msg 'message 98'
send_msg 'message 99'
send_msg 'message 100'
send_msg 'message 101'
send_msg 'message 102'
send_msg 'message 103'
send_msg 'message 104'
send_msg 'message 105'
send_msg 'message 106'
send_msg 'message 107'
send_msg 'message 108'
send_msg 'message 109'
send_msg 'message 110'
send_msg 'message 111'
# Sub
call wk_id 1 proc_func
call wk_id 2 proc_func
call wk_id 3 proc_func
call sub_func
wk_id 1, que_data 'message 90', slp_tm 0.405087 in proc_func
wk_id 2, que_data 'message 91', slp_tm 0.699106 in proc_func
wk_id 3, que_data 'message 92', slp_tm 0.806175 in proc_func
wk_id 1, que_data 'message 93', slp_tm 0.977847 in proc_func
wk_id 2, que_data 'message 94', slp_tm 0.343587 in proc_func
wk_id 3, que_data 'message 95', slp_tm 0.348311 in proc_func
wk_id 2, que_data 'message 96', slp_tm 0.383186 in proc_func
wk_id 3, que_data 'message 97', slp_tm 0.925658 in proc_func
wk_id 1, que_data 'message 98', slp_tm 0.797679 in proc_func
wk_id 2, que_data 'message 99', slp_tm 0.591475 in proc_func
wk_id 2, que_data 'message 100', slp_tm 0.808996 in proc_func
wk_id 1, que_data 'message 101', slp_tm 0.427783 in proc_func
wk_id 3, que_data 'message 102', slp_tm 0.469321 in proc_func
wk_id 1, que_data 'message 103', slp_tm 0.367678 in proc_func
wk_id 3, que_data 'message 104', slp_tm 0.466130 in proc_func
wk_id 2, que_data 'message 105', slp_tm 0.978609 in proc_func
wk_id 1, que_data 'message 106', slp_tm 0.978941 in proc_func
wk_id 3, que_data 'message 107', slp_tm 0.396088 in proc_func
wk_id 3, que_data 'message 108', slp_tm 0.752204 in proc_func
wk_id 2, que_data 'message 109', slp_tm 0.310375 in proc_func
wk_id 1, que_data 'message 110', slp_tm 0.637855 in proc_func
wk_id 2, que_data 'message 111', slp_tm 0.820937 in proc_func
wk_id 3, que_data 'message 112', slp_tm 0.631646 in proc_func
wk_id 1, que_data 'message 113', slp_tm 0.430918 in proc_func
wk_id 3, que_data 'message 114', slp_tm 0.462042 in proc_func
wk_id 1, que_data 'message 115', slp_tm 0.495235 in proc_func
* 정리
wk_id 가 순차적으로 표시되다가 어느순간 slp_tm 이 길어지는 wk_id 는 그만큼 다시 표시될때까지 좀더 걸리게 된다.
이로 유추할수 있는 것은 proc_func 에서 await 를 만나면 해당 코루틴은 잠시 중지하고 다른 코루틴에게 스케줄링을 양보하는것을 볼수 있다.
만약, 여기서 Asyncio.sleep() 이 아니라 time.sleep() sync 함수를 사용하게 되면 전체적인 런루프가 sync 로 묶기게 되어 async 의 효용가치가 없어진다. async 로 code 를 작성할때 실수 했던 부분이다.