in ,

When an SQL database makes a great Pub / Sub, Hacker News

When an SQL database makes a great Pub / Sub, Hacker News


If you compare MySQL or PostgreSQL with Kafka or RabbitMQ, at first, it seems they are entirely different software. And usually, that’s true, as you would use them for quite different tasks. What they have in common is processing streams of data, and they specialize in specific ways of doing it.

While Kafka and RabbitMQ are popular examples of Pub / Subs (also known as message queues or stream processing platforms), I’d like to share some patterns for using SQL databases as Pub / Subs as well. But shouldn’t we pick the right tool for the job? That’s right. However, SQL databases are readily available if you don’t have access to the dedicated messaging infrastructure. They also have some unique features that turn out useful while working with message-driven applications.

I’m going to show three examples of such cases with solutions based on theWatermill libraryfor Go, although the patterns stay the same regardless of the language. But first, let’s ask the fundamental question.

What is a Pub / Sub?

If you generalize it enough, you can define any Pub / Sub client with just two methods (as the name suggests): one forpublishinga new message to a topic, and the other forsubscribingfor messages from a topic.

Publish (topic, message)     Subscribe (topic) stream

So how do you implement this interface for an. SQL database? The simplest way is to useINSERTstatements for publishing andSELECT

for consuming messages. More advanced subscribing patterns can be achieved with mechanisms like MySQL's binlog or PostgreSQL'sLISTENand NOTIFY commands). But for most use cases, the simple approach should be good enough.Using Watermill

The two-method pseudocode above is also the base idea behind Watermill'sPublisher andSubscriberinterfaces. It turns out you can hide quite a lot of complexity behind this abstraction. To implement any Pub / Sub, all you have to do is implement these two methods. One of the supported Pub / Subs is the

SQL Pub / Sub.)

All examples below include a short snippet of Watermill’shandler, which is a function that receives a message and can return new messages to be published.

func(
(msg
) * (message
Message) ([]*messageMessage,error

See the

Getting started guidefor more details. Pattern 1: Persistent Event Log

Message queues offer various features depending on the design. That’s why no one fits all use cases.

One of such features is persistent storage. For example, Apache Kafka comes out of the box with support for storing messages on topics as long as you need it. Others, like Google Cloud Pub / Sub

, Treat messages as ephemeral and delete them after they are consumed or even after several days without being acknowledged.

) You don't always need to store messages, but it comes in useful when working with

domain events. It allows processing events from the past again, sort of like “going back in time” (a similar approach to Event Sourcing, although much simplified).

If you'd like to keep your current messaging software but need persistent storage for events, an SQL database might be the right choice.

Implementation (

TheWatermill handlerfor this pattern is the simplest one. It works just like a transparent proxy, moving messages from one system to the other. The The Routercomponent does all the work in the background. All you have to do is define the subscriber, publisher, and corresponding topics.

router.****** (AddHandler) **********************     "googlecloud-to-mysql",
// unique handler name
************************ googleCloudTopic
**********************,/ / subscriber's topic
************************** googleCloudSubscriber**********************,/ / subscriber
******************** (mysqlTable)
**********************,/ / publisher's topic
**********************
mysqlPublisher (**********************,/ / publisher
********************** (func) ********************** (
msg**************messageMessage
([]
*messageMessage,error{     return
[]
*message
.
Message {
msg ​​},
nil
    },     )************************

See full source:

persistent-event-log example. Pattern 2: Transactional Events

Consider user signup, a classic use case in every web application. The requirements are straightforward. You need to store user details in an SQL table and publishUserSignedUpevent, so other services in your system consume it and take some action (eg, send a newsletter, update statistics report, fill personal details in a CRM).

The issue is, you can't execute SQL statements and publishing events both at the same time. You have two options:

    Save the user first, then publish the event- if publishing fails, the SQL table is up to date, but the rest of the system is inconsistent. The user can log in, but doesn’t receive a newsletter, the monthly statistics report is incorrect, and your sales team doesn’t know whom to call. Other teams that depend on the event will never receive it.

  • **********************************Publish the event first, then save the user- if saving the user fails, the system is consistent, but the SQL table is outdated. The user can’t log in but receives a newsletter. Your reports show more users than there are using your product, and your sales team nags the frustrated user with calls. Let's hope your “unsubscribe” button still works. (

  • (******************************

    You can come up with complicated solutions for this problem, like two-phase commits or periodically synchronizing all systems. There’s also a more straightforward pattern using the SQL database.

    The idea is to insert the event into a separate table instead of publishing it directly. The critical part is doing it within the same transaction as saving the user. It guarantees both or none are stored. In the background, a separate process issubscribed to the table with events andpublishesall incoming rows on some message queue.

    (****************************************************************** (Implementation

    When using Watermill, the handler function is very similar to the previous example. After all, it's the same kind of event proxy, but this time the database is the entry point.

    ********** (router) **********************. ************ (AddHandler) **************     "mysql-to-kafka",     mysqlTable,     mysqlSubscriber
    ,     kafkaTopic,     kafkaPublisher,     func
    (msg ​​*message
    Message
    ) ([]*
    message
    .
    Message,
    error
    ) {     return
    []
    *message
    .
    Message {
    msg ​​},
    nil
        },     )************************

    See full source:

    transactional-events example. Pattern 3: Synchronizing Databases

    Synchronizing two databases isn't a new problem. That’s how replication already worked for decades. It operates on low-level mechanisms (like the binary log), but the idea is very close to a Pub / Sub: one database ispublishing

    changes, and the other subscribesto them.

    A bit harder task is moving data between two different database engines, each with distinct table schema. If replication is using an interface similar to a Pub / Sub, we can use the same principles for migration as well?

    Let’s consider migrating a MySQL table to PostgreSQL. It would look roughly like this:

    The application inserts a new row into the MySQL table.Synchronization service receives this as an update (because itsubscribedto that table).

    The servicetranslatesthe row into the destination format (only if schemas differ, so this step is optional). the service inserts a new row into the PostgreSQL table (publish).************************

    ******************************Implementation

    If table schemas in both databases are the same, the handler can be as short , as in the first two patterns. In this example, we deal with distinct tables, so we need to implement thetranslationpart as well.

    First of all, we need to rename the

    usercolumn to username. Second, mergefirst_name
    andlast_name

    columns intofull_name

    .

    You can see that the gob

    library is used for encoding and decoding the payloads, but you can use anything you like, as long as it serializes to a slice of bytes.

    typemysqlUserstruct    
    ID
    int
        User
    string    FirstNamestring    LastNamestring    CreatedAttime.Time    }          type
    postgresUser
    struct{     
    ID
    int
        Username
    string    FullNamestring    CreatedAttime.Time    }          router.AddHandler(     "mysql-to-postgres",     mysqlTable,     mysqlSubscriber
    ,     postgresTable,     postgresPublisher,     func
    (msg ​​*message
    Message
    ) ([]*
    message
    .
    Message,
    error
    ) {     // Decode the row coming from the MySQL table
    ********************** (decoder) **********************:=
    ******************** gobNewDecoder(
    ) bytesNewBuffer( msg ​​Payload))     originUser=
    mysqlUser{}     err
    =
    decoder (.(Decode) (
    &originUser)     iferr
    =nil{     return
    nil
    ,err    }          // Translate from the MySQL schema to the PostgreSQL schema
    **********************(newUser) **********************:=

    postgresUser    
    ID
    :originUser
    ID,     Username
    :originUser.User
    ,     FullName:fmt.Sprintf
    (
    “% s% s”,
    originUser.
    FirstName,originUser.LastName
    ,     CreatedAt:originUserCreatedAt
    ,     }          // Encode the row to be saved in the PstgreSQL table
    **********************
    (var) **********************
    payload (bytes) ********************** Buffer    encoder=
    gob
    .NewEncoder
    (
    &payload)     err
    =encoder.Encode
    (newUser)     iferr
    =

What do you think?

Leave a Reply

Your email address will not be published. Required fields are marked *

GIPHY App Key not set. Please check settings

January's Free Xbox Giveaway Has a Fantastic Game You've Never Played, Crypto Coins News

January's Free Xbox Giveaway Has a Fantastic Game You've Never Played, Crypto Coins News

IBM C2070-448 Content Collector (ICC) v2.2 Practice Exam