Hướng dẫn rabbitmq nodejs - thỏmq nodejs

Nội dung bài viết

Video học lập trình mỗi ngày

Lý do vì sao nên sử dụng RabbitMQ, và vì sao nên dùng nodejs RabbitMQ, thì đã có ở những bài trước. Ở bài viết này chỉ chia sẻ cách vận chuyển tin nhắn khi RabbitMQ khởi động lại. Bài viết này dành cho những anh em đã biết và sử dụng RabbitMQ rồi.

Anh em cũng biết trong môi trường product nó khác với test. Không hệ thống nào là hoản hảo hết, cách tổ chức tốt nhất là giảm tỷ lệ crash mà thôi. Ví dụ 1 năm chết server 2%, thì cố gằng cải tổ nó chỉ còn 1% là hạnh phúc rồi. RabbitMQ không ngoại lệ, nó cũng bị miss như kết nối qua cao, nghẽn, hoặc thay đổi ổ đĩa...

Hướng dẫn rabbitmq nodejs - thỏmq nodejs
Chính vì vậy để tránh một số tính huống như trên thì cách tốt nhất sử dụng một cơ chế kết nối lại tự động phải được thực hiện trong code của mình, nếu không ứng dụng Node.js sẽ bị sập khi RabbitMQ đột tử . Đây là một ví dụ mã về kết nối lại tự động để bạn tham khảo:

Reconnect RabbitMQ

Sender: sender_reconnect.js

const amqp = require("amqplib");

const queue = "demo";

var connection;

// Kết nối RabbitMQ
async function connectRabbitMQ()
{
    try
    {
        connection = await amqp.connect("amqp://localhost");
        console.info("connect to RabbitMQ success");

        const channel = await connection.createChannel();
        await channel.assertQueue(queue);
        await channel.sendToQueue(queue, new Buffer("Hello, Anonystick!"),
        {
            // RabbitMQ - Khi khởi động lại, tiếp tục chạy
            persistent: true
        });

        connection.on("error", function(err)
        {
            console.log(err);
            setTimeout(connectRabbitMQ, 10000);
        });

        connection.on("close", function()
        {
            console.error("connection to RabbitQM closed!");
            setTimeout(connectRabbitMQ, 10000);
        });

    }
    catch (err)
    {
        console.error(err);
        setTimeout(connectRabbitMQ, 10000);
    }
}

connectRabbitMQ();

Receiver: receiver_reconnect.js

const amqp = require("amqplib");

const queue = "demo";

var connection;

//  Connect RabbitMQ
async function connectRabbitMQ()
{
    try
    {
        connection = await amqp.connect("amqp://localhost");
        console.info("connect to RabbitMQ success");

        const channel = await connection.createChannel();
        await channel.assertQueue(queue);
        await channel.consume(queue, async function(message)
        {
            console.log(message.content.toString());
            channel.ack(message);
        });

        connection.on("error", function(err)
        {
            console.log(err);
            setTimeout(connectRabbitMQ, 10000);
        });

        connection.on("close", function()
        {
            console.error("connection to RabbitQM closed!");
            setTimeout(connectRabbitMQ, 10000);
        });

    }
    catch (err)
    {
        console.error(err);
        setTimeout(connectRabbitMQ, 10000);
    }
}
connectRabbitMQ();

Trong trường hợp này, ngay cả khi RabbitMQ khởi động lại, người gửi và người nhận có thể tự động kết nối lại RabbitMQ. Vậy thôi đơn giản thôi, đương nhiên ví dụ sẽ không hoàn hảo cho tất cả trưởng hợp nhưng bạn có thể dựa vào đó làm nên điều tuyệt với. Như demo "Hello world", nó làm nên tất cả.

Hướng dẫn rabbitmq nodejs - thỏmq nodejs

Đã đăng vào thg 3 30, 9:40 SA 6 phút đọc 6 phút đọc

RabbitMQ là gì?

RabbitMQ là một message broker (MOM - Message-Oriented Middleware), sử dụng giao thức AMQP (Advanced Message Queue Protocol). RabbitMQ là một phần mềm trung gian được sử dụng như là phương tiện liên lạc giữa các ứng dụng, dịch vụ với nhau. Đây là kiến trúc cơ bản của một message queue:

Hướng dẫn rabbitmq nodejs - thỏmq nodejs
Đây là kiến trúc cơ bản của một message queue:

  • Producer : là ứng dụng client, tạo message và publish tới broker.
  • Consumer : là ứng dụng client khác, kết nối đến queue, subscribe (đăng ký) và xử lý (consume) message.
  • Broker (RabbitMQ) : nhận message từ Producer, lưu trữ chúng an toàn trước khi được lấy từ Consumer.

Minh họa cách thức hoạt động của RabbitMQ như sau:

Hướng dẫn rabbitmq nodejs - thỏmq nodejs

  1. User gửi yêu cầu tạo PDF đến web application.
  2. Web application (Producer) gửi tin nhắn đến RabbitMQ bao gồm dữ liệu từ request như tên và email.
  3. Một Exchange chấp nhận các tin nhắn từ Producer và định tuyến chúng đến Queue (hàng đợi) để tạo PDF.
  4. Ứng dụng xử lý PDF (Consumer) nhận Message từ Queue và bắt đầu xử lý PDF.

Các thuật ngữ cần nắm trong mô hình message queue:

  • Producer: Phía bên đảm nhận việc gửi message. Bạn có thể xem đây là người cần gửi thư cho một ai đó.
  • Consumer: Phía bên đảm nhận việc nhận message. Bạn có thể xem đây là người nhận được thư mà ai đó gửi tới.
  • Message: Thông tin dữ liệu truyền từ Producer đến Consumer. Đây chính là thư được gửi đi chứa nội dung gửi, nó có thể là thư tay, hàng hóa, bưu phẩm…
  • Queue: Nơi lưu trữ messages. Bạn có thể xem đây là một hòm lưu trữ thư với cơ chế, ai gửi trước thì được chuyển phát trước (First in first out)
  • Connection: Kết nối giữa ứng dụng và RabbitMQ broker. Đây có thể coi là các bưu điện đặt tại các tỉnh thành, khi bạn gửi thư thì bạn sẽ phải ra bưu điện đúng không nào
  • Exchange: Là nơi nhận message được publish từ Producer và đẩy chúng vào queue dựa vào quy tắc của từng loại Exchange. Để nhận được message, queue phải được nằm (binding) trong ít nhất 1 Exchange.. Có thể hiểu đây là một khu vực kho tổng hợp tất cả các thư mà mọi người gửi thư tới được tổng hợp, phân loại khu vực, gửi hàng loạt hay không…
  • Binding: Đảm nhận nhiệm vụ liên kết giữa Exchange và Queue. Có thể xem đây là quá trình chuyển thừ hòm lưu trữ thư vào kho phân loại.
  • Routing key: Một key mà Exchange dựa vào đó để quyết định cách để định tuyến message đến queue. Khi kiểm tra địa chỉ trên mỗi bức thư thì Routing key chính là địa chỉ người nhận, khi này việc phân loại thư trong kho sẽ phân loại dựa theo địa chỉ này để đưa tới từng khu vực bưu điện đích.
  • AMQP (Advance Message Queuing Protocol): là giao thức truyền message được sử dụng trong RabbitMQ.
  • User: Gồm username và password giúp truy cập vào RabbitMQ dashboard hoặc tạo connection. Có thể xem đây là những nhân viên bưu điện, họ có thể theo dõi, phân loại, can thiệp, hỗ trợ trong quá trình gửi bưu phẩm.
  • Virtual host/Vhost: Cung cấp những cách riêng biệt để các ứng dụng dùng chung một RabbitMQ instance. Hãy xem đây là những bưu cục chi nhánh rải trên khắp đất nước để thuận tiện cho người gửi cũng như người nhận.

Một điều lưu ý ở đây, tuy là mô hình message-oriented nhưng các ứng dụng, dịch vụ không làm việc trực tiếp với message mà chỉ làm việc qua Exchange. Exchange được phân thành nhiều loại (tham khỏa ở đây), trong bài viết này mình chỉ sử dụng Direct Exchagne (Default Exchange) để demo cho các bạn hiểu cách hoạt động trong RabbitMQ nhé.

Bắt tay tạo một mô hình message queue đơn giản thôi nào ! Mình sẽ tạo một RabbitMQ server bằng docker, và tạo ra Publisher và Consumer bằng Nodejs dùng để kết nối với RabbitMQ server vừa tạo

Nhập lệnh dưới đây để chạy rabbitmq server trên docker

docker run --name rabbitmq -p 5672:5672 rabbitmq

Docker sẽ tạo RabbitMQ server từ RabbitMQ image với port mặc định là 5672.

Sau khi tạo RabbitMQ server thành công, chúng ta sẽ tạo publisher và consumer bằng nodejs: Tạo ra 2 file consumer.js và publisher.js với code lần lượt là

const amqp = require("amqplib")
connect();
async function connect(){
    try {
        const connection = amqp.connect("amqp://localhost:5672");
        const channel = await (await connection).createChannel();

        const result = await channel.assertQueue("jobs");
        channel.sendToQueue("jobs", Buffer.from("Hi it works"))
        console.log("jobs sent successfully")


    }
    catch(ex) {
        console.error(ex)
    }
}
const amqp = require("amqplib")
connect();
async function connect(){
    try {
        const connection = amqp.connect("amqp://localhost:5672");
        const channel = await (await connection).createChannel();

        const result = await channel.assertQueue("jobs");
        channel.sendToQueue("jobs", Buffer.from("Hi it works"))
        console.log("jobs sent successfully")


    }
    catch(ex) {
        console.error(ex)
    }
}

Chạy lần lượt câu lệnh "node publisher.js" và "node consumer.js" sẽ nhận được kết quả

Hướng dẫn rabbitmq nodejs - thỏmq nodejs

Đầu tiên chúng ta tạo ra client là publisher kết nối với rabbitmq server và gửi message tới queue có tên là Jobs, tiếp theo consumer sẽ nhận tất cả các message mà có trong queue đó. Khi chúng ta tắt và mở lại thì consumer nhận và in ra Như vậy consumer sẽ nhận tất cả những message có trong queue và hiển thị như trên. Những message cũ vẫn được hiện thị vì chúng ta chưa đá nó ra khỏi queue. Thêm câu channel.ack(message) vào trong consumer.js để sau khi xử lí message xong chúng ta dequeue nó.

Hướng dẫn rabbitmq nodejs - thỏmq nodejs
Như vậy consumer sẽ nhận tất cả những message có trong queue và hiển thị như trên. Những message cũ vẫn được hiện thị vì chúng ta chưa đá nó ra khỏi queue. Thêm câu channel.ack(message) vào trong consumer.js để sau khi xử lí message xong chúng ta dequeue nó.

Hướng dẫn rabbitmq nodejs - thỏmq nodejs

Trong thực tế , việc xử lý message sẽ phức tạp hơn rất nhiều, mục đích bài viết chỉ cho các bạn cách tiếp nhận và xử lý message một cách đơn giản nhất. hy vọng qua bài biết này, chúng ta có thể hiểu cách hoạt động của RabbitMQ cũng như cách thức truyền và nhận message giữa các ứng dụng. Thật đơn giản đúng không 😆

All rights reserved