You are reading the documentation for an outdated Corteza release. 2023.9 is the latest stable Corteza release.

Messagebus

The messagebus feature allows the producer instance to offload some work to a consumer instance via message queues. Message bus is primarily designed to be used with workflows, but it can be extended to work elsewhere.

DevNote: the following content is under construction and needs a lot more work from the low-level perspective.

Terminology

Queue

A message queue is a combination of a name, a consumer, and the metadata. The metadata is used to provide contextual information to the producer and the consumer.

Queue producer

Producers source the messages into the message queue. Out of the box the messagebus defines producers as workflow handlers that can be accessed by the Queue message send function.

Queue consumer

Consumers are the adapters that consume the messages from the message queue. Each consumer is implemented for a specific message destination. Out of the box the messagebus defines consumers for the event bus and the store.

Queue message lifecycle

Write to consumer

A workflow example:
  • register the queue handler

  • use the messagebus global service and push() payload to specific queue

  • payload gets pushed to an internal messagebus channel

  • specific consumer (store) that is specified for this queue is invoked

  • the consumer adapter pushes the payload to the destination

    • could be another queue in store

    • could be eventbus which triggers another workflow

Read from producer

The producers are implementation specific; Corteza uses a specific one as a workflow queue handler, where the payload is being written via messagebus facility to the queue.

Polling delay metadata is the only metadata that is used in the store producer to poll the db data until a subscriber is implemented. This way, we can get contextual data to the producer/consumer.

Internal Corteza messagebus to eventbus flow

Corteza Workflow currently has the possibility to incorporate messagebus capabilities via the eventbus mechanism. The messagebus producer can in this case be an eventbus handler, in our case the already implemented queue trigger (refer to automation/workflows documentation).

The queue handler (producer in the diagram) needs to be triggered to write to messagebus queue, this is done via eventbus mechanism. Once the payload reaches producer, the payload and the queue name are both pushed to messagebus service internal channel (via exposed method).

There is no validation on the payload, it is handled as a []byte type and any marshaling or decoding needs to be done on the consumer side.

Once the messagebus service fetches the appropriate payload from the internal channel, it matches the already registered and instantiated consumer handler for that queue. The payload is then processed in the consumer specific logic.

The message flow in our example is not yet finished, since we also have an eventbus consumer, so the payload is again transformed (in workflow) and a different payload is then being pushed in to some other queue.

An example would be a website registration page that pushes user github id on the queue (lets call it queue users_github). Eventbus would then get the payload with the id, fetch the user metadata from github, write to store and on success, write via eventbus to another queue with the user avatar url (lets call it users_avatars). The eventbus would trigger a new workflow on that queue that would fetch binary data from urls and write it to designated store.

msg-flow-1

Messagebus queues as a distributed pipeline

To continue from our previous example, once our website would grow and we would scale on our instances, so to would we be able to offload (should be choose so) any work from workflows to a designated service or our extended infrastructure on a service provider.

Similar to the previous scenario, on user registration, the messagebus consumer would need to know how to push the payloads to the queueing mechanism on the service provider (ie Amazon SQS / SNS). The infrastructure that we set up would then process all the payloads and use our specific messagebus producer (ie a lambda handler written in python) to send the payload to another Corteza instance via API sink mechanism.

Corteza workflows include a trigger for the API sink also, so the payload would then be processed in another workflow, according to the business logic of the whole flow.

A diagram showing capabilities with creating pipelines between different queueing mechanisms

msg-flow-1