종류
1. ZMQ(Zero MQ)
- 소켓 프로그래밍 API를 대체할 수 있는 정말 괜찮은 라이브러리
- C++로 작성된 라이브러리를 Python에서 활용할 수 있어 상대적으로 빠른 속도의 통신이 가능
- 통신 스택을 구축하기 위해 가볍고 빠른 메시징 구현 제공
2. RabbitMQ
- RabbitMQ는 개발언어인 Erlang의 특성에서 기인해 무척 안정적인 것이 특징이며, 매우 강력한 라우팅 기능을 제공함
- 하지만, 자동 주식거래 시스템에는 메시지의 종류와 처리주체가 많지 않기 때문에 RabbitMQ이 제공하는 강력한 라우팅 기능은 불필요
ZMQ 적용 사례 : Bert-as-Service https://github.com/hanxiao/bert-as-service
- Tensorflow와 ZeroMQ를 사용하여 Production 환경에서 Google BERT를 적용한 오픈소스가 있어 재미삼아 돌려봤다
Start the BERT service
Use Client to Get Sentence Encodes
The server looks like while encoding
- 실제 위 오픈 소스를 통해 어떻게 AI Pipeline이 실제 동작하는지 체감할 수 있었고
- 이 과정에서 ZeroMQ의 구성 요소인 ventilator, worker, sink의 동작을 살펴볼 수 있었다.
ZeroMQ 소스 코드 살펴보기
server side - zmq_decor.py
from contextlib import ExitStack
from zmq.decorators import _Decorator
__all__ = ['multi_socket']
from functools import wraps
import zmq
class _MyDecorator(_Decorator):
def __call__(self, *dec_args, **dec_kwargs):
kw_name, dec_args, dec_kwargs = self.process_decorator_args(*dec_args, **dec_kwargs)
num_socket_str = dec_kwargs.pop('num_socket')
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
num_socket = getattr(args[0], num_socket_str)
targets = [self.get_target(*args, **kwargs) for _ in range(num_socket)]
with ExitStack() as stack:
for target in targets:
obj = stack.enter_context(target(*dec_args, **dec_kwargs))
args = args + (obj,)
return func(*args, **kwargs)
return wrapper
return decorator
class _SocketDecorator(_MyDecorator):
def process_decorator_args(self, *args, **kwargs):
"""Also grab context_name out of kwargs"""
kw_name, args, kwargs = super(_SocketDecorator, self).process_decorator_args(*args, **kwargs)
self.context_name = kwargs.pop('context_name', 'context')
return kw_name, args, kwargs
def get_target(self, *args, **kwargs):
"""Get context, based on call-time args"""
context = self._get_context(*args, **kwargs)
return context.socket
def _get_context(self, *args, **kwargs):
if self.context_name in kwargs:
ctx = kwargs[self.context_name]
if isinstance(ctx, zmq.Context):
return ctx
for arg in args:
if isinstance(arg, zmq.Context):
return arg
# not specified by any decorator
return zmq.Context.instance()
def multi_socket(*args, **kwargs):
return _SocketDecorator()(*args, **kwargs)
Zero MQ가 적용된 Pub/Sub Code
- 데이터를 발송하는 복수의 publisher와 이를 수신하는 복수의 subscriber들이 연결되어 데이터를 분산하는 zmq 패턴에 사용
- REQ-REP 패턴과 다르게 PUB쪽에서 SUB쪽으로 단방향으로 데이터가 전송되며, PUSH-PULL 패턴과 다르게 하나의 메시지를 모든 Subscriber가 수신한다는 특징
- SUB쪽 소켓은 자신에게 맞는 데이터만을 수신하도록 필터를 설정하는데, 보통 PUB-SUB 패턴의 예제에서는 이 필터를 문자열이나 바이트로 설정하여 일치하는 경우만 수신하도록 한다.
publisher.py
import zmq
from time import sleep
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind('tcp://127.0.0.1:2000')
messages = [100, 200, 300]
curMsg = 0
while(True):
sleep(1)
socket.send_pyobj({curMsg:messages[curMsg]})
if(curMsg == 2):
curMsg = 0
else:
curMsg = curMsg + 1
subscriber.py
import zmq
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect('tcp://127.0.0.1:2000')
socket.setsockopt(zmq.SUBSCRIBE,b'') #setsockopt_string()을 사용해서 전달한 문자열을 인코딩한 값이 메시지의 첫부분에 해당할 때에만 메시지가 수신
listener = 0
while(True):
message = socket.recv_pyobj()
if(message.get(listener) != None):
print(message.get(listener))
https://zeromq.org/
https://www.rabbitmq.com/
https://hanxiao.io/2019/01/02/Serving-Google-BERT-in-Production-using-Tensorflow-and-ZeroMQ/?fbclid=IwAR0iOSuhSaFqbNlnuDECE7KMLCEP-akGhTfEF8JJW4bzsGB5Q9Qjyb90Ov0
https://soooprmx.com/using-multipart-data-on-pub-sub-pattern/