* ASGI (Asynchronous Server Gateway Interface)

그대로 읽으면 비동기 서버 게이트웨이 인터페이스 이다. 

자세한 의미는 여기 을 참조한다. 대강 보면 웹 서버, 프레임워크, 응용프로그램 들의 호환성 표준 이라 생각 하면 될듯하다.


아래에서는 Async web server ( uvicorn / starlette ) 을 구성하고 동작시키는 예제를 확인 할것이다. (on Docker)


* sample_code


# 환경

## Host (Docker build)

NAME="Ubuntu" VERSION="18.04.1 LTS (Bionic Beaver)"

Docker version 18.06.1-ce, build e68fc7a


## Container OS /home/anywon # cat /etc/os-release NAME="Alpine Linux" ID=alpine VERSION_ID=3.8.1

## Lan /home/anywon # python3.6 -V Python 3.6.6

## Pip lib

starlette==0.9.9 uvicorn==0.3.23


Python 에서 async function 는 아래와 같이 def 지시어 앞에 async 가 붙는다.

async def test(): return 0

* sample_code 를 clone 후 script dir 에 있는 build.web_server.sh 를 실행하면 자동으로 docker images 가 생성이 된다.
Container OS 는 alpine 이며, Python 과 uvicorn, starlette 를 설치하여 구동이 된다.


# docker images

REPOSITORY          TAG                 IMAGE ID            CREATED             SIZE
web_server          1.0                 82e813dc9a6d        4 days ago          233MB

Docker images build 가 끝이나면 docker images command 를 통하여 images 를 확인할 수 있다. 그뒤

run_web_server_container.sh 를 실행하면 자동으로 Container 가 구동 되게 된다.


# docker ps -a
CONTAINER ID        IMAGE                 COMMAND                  CREATED             STATUS                      PORTS                    NAMES
fb5e0d810f66        web_server:1.0        "/usr/bin/supervisord"   30 minutes ago      Up 30 minutes                                        web_server

# ps -aux | grep python

root     13497  0.0  0.1  94976 24104 pts/0    S    17:28   0:00 /usr/bin/python3.6 /usr/bin/uvicorn --fd 0 app_server:app --log-level=info --http=h11
root     13498  0.0  0.1  94976 24024 pts/0    S    17:28   0:00 /usr/bin/python3.6 /usr/bin/uvicorn --fd 0 app_server:app --log-level=info --http=h11
root     13499  0.0  0.1  94976 24148 pts/0    S    17:28   0:00 /usr/bin/python3.6 /usr/bin/uvicorn --fd 0 app_server:app --log-level=info --http=h11
root     13500  0.0  0.1  94976 23984 pts/0    S    17:28   0:00 /usr/bin/python3.6 /usr/bin/uvicorn --fd 0 app_server:app --log-level=info --http=h11


위와 같이 STATUS 가 Up 상태이면 정상적으로 서버가 구동중이다. 또는 ps -aux | grep python 을 사용하여 확인이 가능하다.


* worker count 를 증가 하고 싶으면 아래와 같이 변경을 하면 된다.

files/conf 의 supervisord.conf 을 열고

[fcgi-program:uvicorn]
directory=/home/anywon/web_server
socket=tcp://0.0.0.0:18080
command=/usr/bin/uvicorn --fd 0 app_server:app --log-level=info --http=h11
numprocs=4
process_name=web_server-%(process_num)d
stdout_logfile=/home/anywon/supervisor/web_server_stdout.log
stdout_logfile_maxbytes=50MB   ; max # logfile bytes b4 rotation (default 50MB)
stdout_logfile_backups=10     ; # of stdout logfile backups (default 10)
stderr_logfile=/home/anywon/supervisor/web_server_stderr.log
stderr_logfile_maxbytes=50MB   ; max # logfile bytes b4 rotation (default 50MB)
stderr_logfile_backups=10     ; # of stdout logfile backups (default 10)

fcgi-program:uvicorn sction 의 numprocs=4 값을 변경 한다. (기본값은 4로 설정)



* 실행 코드

# Ref (uvicorn)   : https://www.uvicorn.org/
# Ref (starlette) : https://www.starlette.io/

from starlette.routing      import Mount, Route, Router
from starlette.applications import Starlette
from starlette.responses    import Response, JSONResponse
from starlette.requests     import Request

async def ping(request):
    headers = {'Content-Type' : 'text/plain'}
    return  Response('OK', status_code = 200, headers = headers)


async def sample(request):

    # Get body
    body = await request.body()

    # Get uri_1
    uri_1 = request.path_params['uri_1']

    # Get uri_2
    uri_2 = request.path_params['uri_2']

    # Get full url with queryString
    full_url = request.url

    # Get request headers
    headers    = request.headers

    # Get QueryString : dict
    qp = request.query_params

    headers = {
        'Content-Type' : 'application/json',
        'keep-alive' : 'timeout=30, max=100',
        'connection' : 'Keep-Alive'
    }

    # Response 
    # return  Response(data, status_code = 200, headers = headers])

    # Response JSON
    return  JSONResponse(data_json, status_code = 200, headers = headers)


API_VERSION = 'v1'

app = Starlette()
app.add_route('/', ping, methods=["GET"])
app.add_route('/', ping, methods=["PUT"])
app.add_route('/api/test' + API_VERSION + '/{uri_1}/{uri_2}', sample, methods=["GET"])
app.add_route('/api/test' + API_VERSION + '/{upload_path:path}', sample, methods=["PUT"])


실행이 되면 위의 app_server.py code 가 실행이 되며, add_route 를 통하여 원하는 uri 를 생성 및 method 를 지정할수 있다.

해당 route 의 request 가 발생되어 처리할 function 을 상단에 지정할수 있으며, 기본적으로 async  def 로 시작하여 async code base 를 확인할수 있다.

기본적인 사용 방법은 위의 sample code 를 보면 확인이 가능하며, 추가적으로 궁금한 사항은 여기 에서 확인 가능 하다.




'기술 > Python' 카테고리의 다른 글

Asyncio + ZeroMQ 를 사용한 Producer & Consumer  (0) 2019.02.11

* Producer & Consumer ( 생산자 & 소비자) 에 문제를 Python3.6 으로 작성해 보았다.

Asyncio (비동기) 작성되었으며, Message queue 인 ZeroMQ 를 사용하여 Producer 이하 (p) 가 Consumer 이하 (c) 에게 원하는 Message (broadcasting) 를 전달 후 각 c 들이 data  를 가져와 처리하는 예제 code 이다.

# 실행 ( pub )

import asyncio import sys, signal import zmq import zmq.asyncio kill = False def bind_zmq(port): print('try bind localhost %d' % port) context = zmq.asyncio.Context() socket = context.socket(zmq.PUB) socket.bind("tcp://127.0.0.1:%s" % port) return socket async def pub_func(): socket = bind_zmq(10010) count = 1 await asyncio.sleep(1) while True: data = "'message %d'" % count print('send_msg %s' % data) await socket.send_string(data) count = count + 1 if kill: break await asyncio.sleep(0.1) socket.close() async def async_task(): fts = [asyncio.ensure_future(pub_func())] for f in asyncio.as_completed(fts): await f def signal_handler(signal, frame): print('signal num %d' % signal) global kill kill = True if __name__ == "__main__": signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) loop = asyncio.get_event_loop() loop.run_until_complete(async_task()) loop.close()

# 실행 ( sub )

import asyncio import sys, signal import zmq import zmq.asyncio import random def connect_zmq(port): context = zmq.asyncio.Context() socket = context.socket(zmq.SUB) socket.connect("tcp://127.0.0.1:%s" % port) socket.setsockopt_string(zmq.SUBSCRIBE, '') # socket.setsockopt(zmq.SUBSCRIBE, b'') return socket async def sub_func(que): print('call sub_func') socket = connect_zmq(10010) while True: sub_msg = await socket.recv_string() # print('sub_msg %s in sub_func' % sub_msg) qs = que.qsize() if qs >= 90: print('warning que size over 90') if qs >= 98: print('dont inque') continue que.put_nowait(sub_msg) socket.close() async def proc_func(wk_id, que): print('call wk_id %d proc_func' % wk_id) while True: que_data = await que.get() slp_tm = random.uniform(0.3, 1.0) print('wk_id %d, que_data %s, slp_tm %f in proc_func' % (wk_id, que_data, slp_tm)) await asyncio.sleep(slp_tm) async def async_tasks(): que = asyncio.Queue(maxsize = 100) # fts = [asyncio.ensure_future(sub_func(que)), fts = [asyncio.ensure_future(proc_func(wk_id, que)) for wk_id in range(1,4) ] fts.append(asyncio.ensure_future(sub_func(que))) for f in asyncio.as_completed(fts): await f await que.join() def signal_handler(signal, frame): print('signal num %d' % signal) sys.exit(1) if __name__ == "__main__": signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) loop = asyncio.get_event_loop() loop.run_until_complete(async_tasks()) loop.close()

code 는 위와 같다. local address 의 10010 port 를 사용하며 pub code 가 message 값을 1씩 증가시키면서 publish 를 하게 되면 sub code 는 n개의 ensure_future object 로 publish 된 data 를 subscribe 하여 data 를 처리하는 방식이다. n번째의 ensure_future 객체 는 random 값으로 asyncio.sleep 를 각기 다르게 주기 때문에 어떤 객체부터 끝나는지 알수 없게 된다.


아래는 코드의 결과 이다.

# Pub


send_msg 'message 87' send_msg 'message 88' send_msg 'message 89' send_msg 'message 90' send_msg 'message 91' send_msg 'message 92' send_msg 'message 93' send_msg 'message 94' send_msg 'message 95' send_msg 'message 96' send_msg 'message 97' send_msg 'message 98' send_msg 'message 99' send_msg 'message 100' send_msg 'message 101' send_msg 'message 102' send_msg 'message 103' send_msg 'message 104' send_msg 'message 105' send_msg 'message 106' send_msg 'message 107' send_msg 'message 108' send_msg 'message 109' send_msg 'message 110' send_msg 'message 111'


# Sub


call wk_id 1 proc_func call wk_id 2 proc_func call wk_id 3 proc_func call sub_func wk_id 1, que_data 'message 90', slp_tm 0.405087 in proc_func wk_id 2, que_data 'message 91', slp_tm 0.699106 in proc_func wk_id 3, que_data 'message 92', slp_tm 0.806175 in proc_func wk_id 1, que_data 'message 93', slp_tm 0.977847 in proc_func wk_id 2, que_data 'message 94', slp_tm 0.343587 in proc_func wk_id 3, que_data 'message 95', slp_tm 0.348311 in proc_func wk_id 2, que_data 'message 96', slp_tm 0.383186 in proc_func wk_id 3, que_data 'message 97', slp_tm 0.925658 in proc_func wk_id 1, que_data 'message 98', slp_tm 0.797679 in proc_func wk_id 2, que_data 'message 99', slp_tm 0.591475 in proc_func wk_id 2, que_data 'message 100', slp_tm 0.808996 in proc_func wk_id 1, que_data 'message 101', slp_tm 0.427783 in proc_func wk_id 3, que_data 'message 102', slp_tm 0.469321 in proc_func wk_id 1, que_data 'message 103', slp_tm 0.367678 in proc_func wk_id 3, que_data 'message 104', slp_tm 0.466130 in proc_func wk_id 2, que_data 'message 105', slp_tm 0.978609 in proc_func wk_id 1, que_data 'message 106', slp_tm 0.978941 in proc_func wk_id 3, que_data 'message 107', slp_tm 0.396088 in proc_func wk_id 3, que_data 'message 108', slp_tm 0.752204 in proc_func wk_id 2, que_data 'message 109', slp_tm 0.310375 in proc_func wk_id 1, que_data 'message 110', slp_tm 0.637855 in proc_func wk_id 2, que_data 'message 111', slp_tm 0.820937 in proc_func wk_id 3, que_data 'message 112', slp_tm 0.631646 in proc_func wk_id 1, que_data 'message 113', slp_tm 0.430918 in proc_func wk_id 3, que_data 'message 114', slp_tm 0.462042 in proc_func wk_id 1, que_data 'message 115', slp_tm 0.495235 in proc_func



* 정리

wk_id 가 순차적으로 표시되다가 어느순간 slp_tm 이 길어지는 wk_id 는 그만큼 다시 표시될때까지 좀더 걸리게 된다.

이로 유추할수 있는 것은 proc_func 에서 await 를 만나면 해당 코루틴은 잠시 중지하고 다른 코루틴에게 스케줄링을 양보하는것을 볼수 있다.

만약, 여기서 Asyncio.sleep() 이 아니라 time.sleep() sync 함수를 사용하게 되면 전체적인 런루프가 sync 로 묶기게 되어 async 의 효용가치가 없어진다. async 로 code 를 작성할때 실수 했던 부분이다.

'기술 > Python' 카테고리의 다른 글

python asgi server ( uvicorn / starlette )  (0) 2019.02.19

+ Recent posts