FrankTheDevop FrankTheDevop FrankTheDevop FrankTheDevop FrankTheDevop

RabbitMQ

MLAND Series – Tips III – Loopback RabbitMQ Usage

150 150 Frank

The purpose of this post is to help you get up and running with RabbitMQ integrated into your Loopback API.

Prerequisite

You have a working Loopback API Project.

RabbitMQ

Install the Loopback component

Inside your Loopback project folder run:

npm install loopback-component-mq --save

This will install the component and all it´s dependencies as you are used to

Configure the RabbitMQ component

Register it

Loopback loads components per default only if they are inside known folders. External components are in different locations and need be registered.
You do so by adding "../node_modules/loopback-component-mq/lib/mixins" to the mixins array in the model-config.json.

Component configuration

Loopback checks if a configuration is available for a component inside component-config.json.
Therefore add the following template to component-config.json:

{
"loopback-component-mq": {
    "path": "loopback-component-mq",
    "options": {
      "restPort": 15672,
      "acls": [
        {
          "accessType": "*",
          "principalType": "ROLE",
          "principalId": "$unauthenticated",
          "permission": "DENY"
        }
      ]
    },
    "topology": {
      "connection": {
        "uri": "amqp://$username:$password@$host:$port/$vhost", (1)
        "timeout": 30000
      },
      "exchanges": [
        {
          "name": "my_first_exhange",
          "type": "topic",
          "persistent": true
        }
      ],
      "queues": [
        {
          "name": "my_first_queue",
          "subscribe": true,
          "limit": 1
        },
        {
          "name": "my_second_queue",
          "limit": 1
        }
      ],
      "bindings": [
        {
          "exchange": "my_first_exchange",
          "target": "my_first_queue",
          "keys": [
            "my_first_queue"
          ]
        },
        {
          "exchange": "my_first_exchange",
          "target": "my_second_queue",
          "keys": [
            "my_second_queue"
          ]
        }
      ],
      "logging": {
        "adapters": {
          "stdOut": {
            "level": 5,
            "bailIfDebug": true
          }
        }
      }
    }
  }

Explanation:
The path entry reflects the component name.
You can configure where the management interface is located (restPort) and the acls you like via the options entry.

In topology we define the exchanges and queues we want to use inside this API.

  1. Inside connection you define with the uri where your RabbitMQ Service is located and with the timeout how long the system should wait until a connect call fails.

    I strongly suggest defining it, especially if you connect over the internet. Otherwise you instantly receive a connection error.

  2. In exchanges you define the exchanges you want to use, with their name, their type and if you want them to persist after the api disconnects (key persistent). To learn more about
    the Types have a look at (2).
  3. queues contains all queues you want to use with the name, if you want to subscribe and if you want to limit the amount of concurrently processed messages.
    If you subscribe to a queue you will retrieve all messages inside it, so make sure you handle them all. Otherwise you wonder why you number of messages grow and the api outputs errors. Use this if you implement a consumer for all messages you send to this queue (I show you later how it´s done).

    I strongly advise you to use limits, otherwise the api might just stop working when too many message fight for resources. Additionally this impacts the performance of your API.

  4. bindings model the connection between exchanges and queues while defining which routing key (key keys) you use.
    exchange is self explanatory, target is the queue you want to connect. You can add keywords or themes in the keys array, I just put in the queue name I´d like to use.
  5. You can configure the logging you want to have inside logging. I added the configuration I use myself for debugging.
  6.  

    Mixin configuration

    The RabbitMQ component uses a mixin. This way you can configure the consumers and producers per model.

    Add a similar structure like this to your model.json file:

    "mixins": {
        "MessageQueue": {
          "consumers": {
            "consumerMessage": {
              "queue": "my_first_queue",
              "type": "$company.$type.$subtype"
            }
          },
          "producers": {
            "producerGreet": {
              "exchange": "my_first_exchange",
              "options": {
                "routingKey": "my_first_queue",
                "type": "$company.$type.$subtype"
                "contentType": "application/json"
              }
            },
          }
        }
      },
    

    If you already have a mixins key in the model.json just add the inner structure, beginning at "MessageQueue".

    Inside “MessageQueue” you can define a consumers and producers object, in which you define the consumers and producers respectively.

    A consumer has a name ( in the example consumerMessage) and needs to know from which queue it should get it´s messages (key queue) and which message type it is responsibly for (key type). If only one message type will occur in queue you need only one consumer, else you need more. The name of the consumer (consumerMessage in the example) is the name of the method you have to implement for this model. I come to this in a bit.

    A producer has a name too, is connected to an exchange (key exchange) and has some options. Here comes the keys from the component-config.json into play, where I said I use the queue name I want to target. They need to match the routingKey. At last we set contentType, for me this is normally json.
    You don´t need to implement a producer, the component does it for you. In a few moment I will show you how you can call it.

    Usage

    Consumer

    As mentioned you need to implement a consumer yourself. The syntax is (ES6 Syntax):

    {
    $model.consumername = (payload) => {
      // If your message comes from another source than a loopback-component-mq based API
      const { message } = JSON.parse(payload)
    
      // Otherwise you can simplify it to
      const { message } = payload
    
      // Do something
      ....
    
      if (error) {
        // Depending on your architecture you might want to reject the message if an error occurs.
        // This will not acknowledge the message and it will be re-delivered to you. So you can use this if you have a temporary problem, but the message is important
        return Promise.reject(error)
      } else {
        // If everything is alright acknowledge this message 
        return Promise.return()
      }
    }
    

    You tell the queue that you handled a message by returning a Promise.resolve(). If you want the message to be re-delivered you send Promise.reject().

    Producer

    You can use a producer anywhere inside the scope of your model this way:

    {
    $model.greet = (name) => {
      return $model.producerGreet({greeting: 'Hi', name: name}))
    }
    

    That´s it already. The producer send this message with a JSON payload to the defined exchange with the defined routingKey.

    Conclusion / Lessons learned

    That´s it for today. In this tip you learned:

    • How to install the Loopback RabbitMQ Component
    • How to register the Loopback RabbitMQ Component
    • How to configure the Loopback RabbitMQ Component
    • How to configure consumers and producers for the Loopback RabbitMQ Component
    • How to implement a consumer
    • How to use a producer

    If you have any question post them into the comments. Or feel free to send my an email.

    Yours sincerely,

    Frank

    Sources
    (1) https://www.rabbitmq.com/uri-spec.html
    (2) https://www.rabbitmq.com/getstarted.html

MLAND Series – Tips II – Loopback AthenaPDF

150 150 Frank

Purpose

When developing an Infrastructure for a Client Project I faced the situation that I needed to support 50 concurrent request to generate a PDF. The purpose of this post is to show you how I did this using AthenaPDF and RabbitMQ.

Setup

The Setup used for this Tip is a machine on Digital Ocean*, created and managed through Docker Cloud and with an Loopback API Container, an AthenaPDF Container and RabbitMQ hosted on CloudAMQP .

 

Mixin Definition

You can use a mixin definition similar to this:

"mixins": {
    "MessageQueue": {
      "producers": {
        "producerSendGeneratePDF": {
          "exchange": "generate.pdf",
          "options": {
            "routingKey": "pdf",
            "type": "company.project.messages.generatepdf",
            "contentType": "application/json"
          }
        },
        "producerPdfGenerated": {
          "exchange": "generate.pdf",
          "options": {
            "routingKey": "pdf",
            "type": "company.project.messages.pdfgenerated",
            "contentType": "application/json"
          }
        }
      },
      "consumers": {
        "consumerGeneratePDF": {
          "queue": "client.generate.pdf",
            "type": "company.project.messages.generatepdf",
        },
        "consumerPDFGenerated": {
          "queue": "client.generate.pdf",
            "type": "company.project.messages.pdfgenerated",
        }
      }
    }
  }

 

With this configuration you can execute  Model.producerSendGeneratePDF({data}) and send a message of the following type:

"company.project.messages.generatepdf"

 

The message with call Model.consumerGeneratePDF where you can prepare the HTML Document you want to be converted and call the AthenaPDF Container to create it. After the work is done in Model.consumerGeneratePDF you can call producerPdfGenerated({data}) to start the next step of your workflow.

If you go with my config, you implement Model.consumerPDFGenerated. There you can execute whatever next step you want to do, send an e-mail to your customer, Upload to Amazon S3 or whatever your next step is.

 

How to limit the AthenaPDF Container Resource consumption

To limit the Memory Usage of the AthenaPDF Container at this to the Docker Stackfile definition of the Container: mem_limit: Xm.

X is the number of MB you allow the Container to get. 256 MB is a good starting point if you start measuring.

How to fix Xlib: extension “RANDR” missing on display “:99”

If you are getting this error it means the shared memory area /dev/shm isn´t shared between the host and the Container.  To fix this add these two Lines to the PDF Container definition of the Stackfile:

 

volumes:
  - '/dev/shm:/dev/shm'

 

How to not overload the AthenaPDF Container

Especially when you limit the amount of  Memory the PDF Container can use you want to limit the number of concurrent PDFs being generated.

Referring to the RabbitMQ Tip you can limit the number of concurrent tasks that the API Endpoint does at any given moments:

Check the Section server/component-config.json, in the loopback-component-mq definition

{  
   "loopback-component-mq":{  
      "path":"loopback-component-mq",
      "options":{  
         "restPort":15672,
         "acls":[  
            {  
               "accessType":"",
               "principalType":"ROLE",
               "principalId":"$unauthenticated",
               "permission":"DENY"
            }
         ]
      }
   },
   "topology":{  
      "connection":{  
         "uri":"amqp://user:password@host/vhost",
         "timeout": timeoutinmilliseconds
      },
      "exchanges":[  
         {  
            "name":"exchangename",
            "type":"exchangetype",
            "persistent":persistent
         }
      ],
      "queues":[  
         {  
            "name":"queuename",
            "subscribe":true,
            "limit":concurrentnonacknoledgedtasknumber
         }
      ],
      "bindings":[  
         {  
            "exchange":"exchangename",
            "target":"queuename",
            "keys":[  

            ]
         }
      ],
      "logging":{  
         "adapters":{  
            "stdOut":{  
               "level":5,
               "bailIfDebug":true
            }
         }
      }
   }
}

Experiment with the limit in the queues definition and see how many concurrent tasks your infrastructure can handle with you configuration.

 

Scaling

Vertically

The great point about this way of setting up your infrastructure is that you can easily scale horizontally. Just give the PDF Container more Memory, get a bigger Machine and increase the limit of the queue definition.

Horizontally

If you want to scale vertically you need to add a haproxy container and change your configuration a bit. I will show you how to do it in the next Tip.

 

Sources

AthenaPDF

Digital Ocean* (Affiliate Link)

Docker Cloud

RabbitMQ

CloudAMQP

MLAND Series – Tips I – Loopback RabbitMQ Mistakes

150 150 Frank

Purpose

The purpose of this post is to highlight the most common mistakes I experienced with Loopback and RabbitMQ myself.

Mistakes I made and might have done too

  1. Receiving [rabbit.unhandled] MessageForgotten to load mixing in model-config.json
  2. Message keep getting redelivered
  3. Getting undefined from payload

Fix Receiving [rabbit.unhandled]

This error can result from 4 different mistakes:

  • Forgotten to load mixin in model-config.json
  • Forgotten to configure component-config.json
  • Messed up the Entry in model.json
  • Connecting to Queue while not handling all Message Types

Fix Forgotten to load mixin in model-config.json

Check how the Entry for model-config.json should look like below

Fix Forgotten to configure component-config.json

Check how the Entry for component-config.json should look like below

Fix Messed up the Entry in model.json

Check how the Entry for model.json should look like below

Connecting to Queue while not handling all Message Types

If you get [rabbit.unhandled] Message of x on queue y, connection ‘default’ was not procced by any registered handlers and the messages only get re-queued and redelivered, you have probably connected an api to a queue where you receive message you don´t handle.

Loopback-rabbit-mq picks the next message available on the queues if is connected to, even if you haven´t implemented handling this messages.

So make sure you connect each api only to queues where you handle all message types from. Otherwise one API will eventually block the queue because it gets messages it can´t process.

Fix Message keep getting redelivered

Check the function you implemented and make sure you return a Promise.resolve if your function succeeded and Promise.reject if it failed.

Fix Getting undefined from payload

If the origin of your message is not loopback it might mean you need to use JSON.parse to retrieve the data in the Message. Check how the Code should look like above

Files to Look at

  1. server/model-config.json
  2. server/component-config.json
  3. common/models/model.json / server/models/model.json
  4. common/models/model.js / server/models/model.js

server/model-config.json

This file holds the Information which mixins to load for this project.

This happens inside the “mixins”: [] Block

Make sure you load the loopback-component-mq with an entry like this:

“../nodemodules/loopback-component-mq/lib/mixins”

server/component-config.json

This file holds the general configuration for components that need it.

Make sure you configure loopback-component-mq with an entry like this:

{  
   "loopback-component-mq":{  
      "path":"loopback-component-mq",
      "options":{  
         "restPort":15672,
         "acls":[  
            {  
               "accessType":"",
               "principalType":"ROLE",
               "principalId":"$unauthenticated",
               "permission":"DENY"
            }
         ]
      }
   },
   "topology":{  
      "connection":{  
         "uri":"amqp://user:password@host/vhost",
         "timeout": timeoutinmilliseconds
      },
      "exchanges":[  
         {  
            "name":"exchangename",
            "type":"exchangetype",
            "persistent":persistent
         }
      ],
      "queues":[  
         {  
            "name":"queuename",
            "subscribe":true,
            "limit":concurrentnonacknoledgedtasknumber
         }
      ],
      "bindings":[  
         {  
            "exchange":"exchangename",
            "target":"queuename",
            "keys":[  

            ]
         }
      ],
      "logging":{  
         "adapters":{  
            "stdOut":{  
               "level":5,
               "bailIfDebug":true
            }
         }
      }
   }
}

 

 

common/models/model.json / server/models/model.json

In the model.json you configure the consumers and producers that this model implements, together with which queue they operate on and which type the message has.

Make sure you configure loopback-rabbit-mq with an entry like this:

{  
   "mixins":{  
      "MessageQueue":{  
         "consumers":{  
            "nameof_function_to_imlement":{  
               "queue":"queue_name_from_component_config",
               "type":"name_of_message_type"
            }
         },
         "producers":{  
            "name_of_function_to_imlement":{  
               "exchange":"exchange_name_from_component_config",
               "options":{  
                  "routingKey":"routing_key_name",
                  "type":"name_of_message_type",
                  "contentType":"content_type_you_wish"
               }
            }
         }
      }
   }
}

 

 

common/models/model.js / server/models/model.js

In the model.js you implement the consumer and producer functions you declared i> the model.json file.

Make sure you implement it with an entry like this:

Model.name_of_function_to_implement = (payload) => {
  // If send from Loopback
  const { variable_name} = payload

  // If send from another source like python
  const { variable_name } = JSON.parse(payload)
}

If your queue needs acknowledgements make sure to return a Promise.resolve().

If something went wrong return a Promise.reject(). RabbitMQ redelivers the Message then.