Message Queue

NW
목록으로 돌아가기
메시지 큐


종류

1. ZMQ(Zero MQ)

- 소켓 프로그래밍 API를 대체할 수 있는 정말 괜찮은 라이브러리

- C++로 작성된 라이브러리를 Python에서 활용할 수 있어 상대적으로 빠른 속도의 통신이 가능

- 통신 스택을 구축하기 위해 가볍고 빠른 메시징 구현 제공

2. RabbitMQ

- RabbitMQ는 개발언어인 Erlang의 특성에서 기인해 무척 안정적인 것이 특징이며, 매우 강력한 라우팅 기능을 제공함

- 하지만, 자동 주식거래 시스템에는 메시지의 종류와 처리주체가 많지 않기 때문에 RabbitMQ이 제공하는 강력한 라우팅 기능은 불필요

ZMQ 적용 사례 : Bert-as-Service https://github.com/hanxiao/bert-as-service

- Tensorflow와 ZeroMQ를 사용하여 Production 환경에서 Google BERT를 적용한 오픈소스가 있어 재미삼아 돌려봤다

Start the BERT service


Use Client to Get Sentence Encodes


The server looks like while encoding


- 실제 위 오픈 소스를 통해 어떻게 AI Pipeline이 실제 동작하는지 체감할 수 있었고

- 이 과정에서 ZeroMQ의 구성 요소인 ventilator, worker, sink의 동작을 살펴볼 수 있었다.

ZeroMQ 소스 코드 살펴보기


server side - zmq_decor.py 

from contextlib import ExitStack

from zmq.decorators import _Decorator

__all__ = ['multi_socket']

from functools import wraps

import zmq


class _MyDecorator(_Decorator):
    def __call__(self, *dec_args, **dec_kwargs):
        kw_name, dec_args, dec_kwargs = self.process_decorator_args(*dec_args, **dec_kwargs)
        num_socket_str = dec_kwargs.pop('num_socket')

        def decorator(func):
            @wraps(func)
            def wrapper(*args, **kwargs):
                num_socket = getattr(args[0], num_socket_str)
                targets = [self.get_target(*args, **kwargs) for _ in range(num_socket)]
                with ExitStack() as stack:
                    for target in targets:
                        obj = stack.enter_context(target(*dec_args, **dec_kwargs))
                        args = args + (obj,)

                    return func(*args, **kwargs)

            return wrapper

        return decorator


class _SocketDecorator(_MyDecorator):
    def process_decorator_args(self, *args, **kwargs):
        """Also grab context_name out of kwargs"""
        kw_name, args, kwargs = super(_SocketDecorator, self).process_decorator_args(*args, **kwargs)
        self.context_name = kwargs.pop('context_name', 'context')
        return kw_name, args, kwargs

    def get_target(self, *args, **kwargs):
        """Get context, based on call-time args"""
        context = self._get_context(*args, **kwargs)
        return context.socket

    def _get_context(self, *args, **kwargs):
        if self.context_name in kwargs:
            ctx = kwargs[self.context_name]

            if isinstance(ctx, zmq.Context):
                return ctx

        for arg in args:
            if isinstance(arg, zmq.Context):
                return arg
        # not specified by any decorator
        return zmq.Context.instance()


def multi_socket(*args, **kwargs):
    return _SocketDecorator()(*args, **kwargs)


Zero MQ가 적용된 Pub/Sub Code

- 데이터를 발송하는 복수의 publisher와 이를 수신하는 복수의 subscriber들이 연결되어 데이터를 분산하는 zmq 패턴에 사용

- REQ-REP 패턴과 다르게 PUB쪽에서 SUB쪽으로 단방향으로 데이터가 전송되며, PUSH-PULL 패턴과 다르게 하나의 메시지를 모든 Subscriber가 수신한다는 특징

- SUB쪽 소켓은 자신에게 맞는 데이터만을 수신하도록 필터를 설정하는데, 보통 PUB-SUB 패턴의 예제에서는 이 필터를 문자열이나 바이트로 설정하여 일치하는 경우만 수신하도록 한다.

publisher.py


import zmq
from time import sleep

context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind('tcp://127.0.0.1:2000')

messages = [100, 200, 300]
curMsg = 0

while(True):
    sleep(1)
    socket.send_pyobj({curMsg:messages[curMsg]})
    if(curMsg == 2):
        curMsg = 0
    else:
        curMsg = curMsg + 1


subscriber.py


import zmq

context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect('tcp://127.0.0.1:2000')
socket.setsockopt(zmq.SUBSCRIBE,b'') #setsockopt_string()을 사용해서 전달한 문자열을 인코딩한 값이 메시지의 첫부분에 해당할 때에만 메시지가 수신

listener = 0

while(True):
    message = socket.recv_pyobj()
    if(message.get(listener) != None):
       print(message.get(listener))



[Reference]

https://zeromq.org/

https://www.rabbitmq.com/

https://hanxiao.io/2019/01/02/Serving-Google-BERT-in-Production-using-Tensorflow-and-ZeroMQ/?fbclid=IwAR0iOSuhSaFqbNlnuDECE7KMLCEP-akGhTfEF8JJW4bzsGB5Q9Qjyb90Ov0

https://soooprmx.com/using-multipart-data-on-pub-sub-pattern/ Buy Me A Coffee