Intro

This is part 2 of the “Realtime Data with Mariadb” Series, if you haven’t got the chance to read part 1 I highly recommend to read it first.

Websocket

Now that we’ve successfully listened to database changes, we want to streamline it with WebSocket.

In this example I am using Micronaut’s websocket feature, you are obviously free to choose any framework or even implement it yourself.

As usual all the code in this article will be available here

Project Setup

Let’s start by creating a micronaut project.

1
mn create-app realtimemariadb --features=yaml && cd realtimemariadb

We are going to introduce new dependencies, add these lines to your build.gradle

1
2
3
 implementation ("org.mariadb.jdbc:mariadb-java-client:3.0.3") // MariaDB JDBC driver
 implementation ("org.springframework.boot:spring-boot-starter-jdbc:3.2.5")
 runtimeOnly("io.micronaut.sql:micronaut-jdbc-hikari")

We need those dependencies to run queries to our database.

The next step is modifying our application.yml add these lines for our database config.

1
2
3
4
5
6
7
8
9
datasources:
  default:
    url: jdbc:mariadb://localhost:3306/aschema
    driverClassName: org.mariadb.jdbc.Driver
    host: localhost
    schema: aschema
    username: test
    password: test
    dialect: MYSQL

Database Listener

In the previous part, we created our database listener in Application.java file, we are going to move it to a new File since we going to implement Micronaut’s ApplicationEventListener class. This will run our listener after Micronaut has finished starting.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package realtimemariadb;

import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.Event;
import com.github.shyiko.mysql.binlog.event.EventType;
import com.github.shyiko.mysql.binlog.event.QueryEventData;
import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;
import io.micronaut.context.event.ApplicationEventListener;
import io.micronaut.runtime.server.event.ServerStartupEvent;
import jakarta.inject.Singleton;
import net.sf.jsqlparser.JSQLParserException;
import net.sf.jsqlparser.parser.CCJSqlParserUtil;
import net.sf.jsqlparser.statement.Statement;
import net.sf.jsqlparser.statement.delete.Delete;
import net.sf.jsqlparser.statement.insert.Insert;
import net.sf.jsqlparser.statement.update.Update;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

//this class will run after micronaut has finished starting
@Singleton
public class DatabaseListener implements ApplicationEventListener<ServerStartupEvent> {

    private final MainChannel mainChannel;
    private final DataRepository repository;
    private final DatasourceConfig datasourceConfig;

    public DatabaseListener(MainChannel mainChannel, DataRepository repository, DatasourceConfig datasourceConfig) {
        this.mainChannel = mainChannel;
        this.repository = repository;
        this.datasourceConfig = datasourceConfig;
    }

    private static final Logger LOG = LoggerFactory.getLogger(DatabaseListener.class);

    @Override
    public void onApplicationEvent(ServerStartupEvent event) {
        listen();
    }

    public void listen() {

        BinaryLogClient client = new BinaryLogClient(
                datasourceConfig.getHost(),
                3306,
                datasourceConfig.getSchema(),
                datasourceConfig.getUsername(),
                datasourceConfig.getPassword());

        EventDeserializer eventDeserializer = new EventDeserializer();
        eventDeserializer.setCompatibilityMode(
                EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG,
                EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY);
        client.setEventDeserializer(eventDeserializer);
        client.registerEventListener(new BinaryLogClient.EventListener() {
            @Override
            public void onEvent(Event event) {
                if (event.getHeader().getEventType() == EventType.QUERY) {
                    QueryEventData queryEventData = event.getData();

                    // parse the query from the event
                    Statement statement = null;

                    try {
                        statement = CCJSqlParserUtil.parse(queryEventData.getSql());
                    } catch (JSQLParserException e) {
                        LOG.error(String.format("Error when parsing query event data : %s", e.getMessage()));
                        e.printStackTrace();
                    }

                    // get query type so we can cast statement to the appropriate class
                    String queryType = statement.getClass().getSimpleName().toUpperCase();
                    String tableName = null;

                    switch (queryType) {
                        case "INSERT":
                            Insert insertStatement = (Insert) statement;
                            tableName = insertStatement.getTable().getName();
                            break;
                        case "DELETE":
                            Delete deleteStatement = (Delete) statement;
                            tableName = deleteStatement.getTable().getName();
                            break;
                        case "UPDATE":
                            Update updateStatement = (Update) statement;
                            tableName = updateStatement.getTable().getName();
                            break;
                        default:
                            System.out.println(String.format("unsupported query to watch %s", queryType));
                    }

                    // broadcast event
                    DataEvent dataEvent = new DataEvent(tableName, repository.fetchData(tableName));
                    System.out.println(String.format("Data changes on table %s", dataEvent.getData()));
                    mainChannel.broadcast(dataEvent);
                }
            }
        });

        try {
            client.connect();
        } catch (IOException e) {
            LOG.error(e.getMessage());
        }
    }
}

Websocket Server

We are going to make a very simple WebSocket server that listens to a connection open, and any messages that come to our service.

WebSocketServer.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package realtimemariadb;


import io.micronaut.websocket.WebSocketSession;
import io.micronaut.websocket.annotation.OnClose;
import io.micronaut.websocket.annotation.OnMessage;
import io.micronaut.websocket.annotation.OnOpen;
import io.micronaut.websocket.annotation.ServerWebSocket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


@ServerWebSocket("/channel/{userName}")
public class WebSocketServer {

    private final MainChannel mainChannel;

    private static final Logger LOG = LoggerFactory.getLogger(WebSocketServer.class);

    public WebSocketServer(MainChannel mainChannel) {
        this.mainChannel = mainChannel;
    }

    @OnOpen
    public void onOpen(String userName, WebSocketSession session) {
        String msg = String.format("User %s joined!", userName);
        LOG.info(msg);
    }

    @OnMessage
    public void onMessage(String userName, String tableName, WebSocketSession session) {
        //every message that comes will be assumed as subscription
        Subscription subscription = new Subscription(session, tableName);
        LOG.info(String.format("User %s subscribing on table %s", userName, tableName));
        mainChannel.subscribe(subscription);
    }

    @OnClose
    public void onClose(String userName, WebSocketSession session)  {
        mainChannel.unsubscribe(session.getId());
    }

}

Then we are going to implement our channel, which will broadcast the changes to subscribed clients.

MainChannel.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package realtimemariadb;

import io.micronaut.websocket.WebSocketBroadcaster;
import jakarta.inject.Singleton;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

@Singleton
public class MainChannel implements Channel {

    //in-memory store that records active subscription
    private final Map<String, Set<Subscription>> subscriptions = new HashMap<>();

    public MainChannel() {}

    @Override
    public void subscribe(Subscription subscription) {
        //register active subscription
        Set<Subscription> ss = subscriptions.get(subscription.getTableName());
        if (ss != null) {
            ss.add(subscription);
        } else {
            //TODO: need to check if table exists
            subscriptions.put(subscription.getTableName(), Set.of(subscription));
        }
    }

    //it only handles tableName for now
    @Override
    public void broadcast(DataEvent dataEvent) {
        if (dataEvent == null) {
            return;
        }
        //broadcast changes to subscribed users
        Set<Subscription> subscribed = subscriptions.get(dataEvent.getTableName());
        if (subscribed != null) {
            subscribed.forEach(s -> {
                //send to each client
                s.getWebsocketSession().sendSync(dataEvent);
            });
        }
    }

    @Override
    public void unsubscribe(String sessionId) {
        //when the user disconnects, delete all entries from subscription
        subscriptions.forEach((key, value) -> {
            //remove session from subscription
            Set<Subscription> mValue = value.stream().filter( v -> v.getSessionId() != sessionId).collect(Collectors.toSet());
            subscriptions.put(key, mValue);
        });
    }
}

Now try to run the code with

1
./gradlew :run

If everything is correct, you will see these messages

1
2
3
4
5
6
7
8
9
10
11
12
13
> Task :run
 __  __ _                                  _
|  \/  (_) ___ _ __ ___  _ __   __ _ _   _| |_
| |\/| | |/ __| '__/ _ \| '_ \ / _` | | | | __|
| |  | | | (__| | | (_) | | | | (_| | |_| | |_
|_|  |_|_|\___|_|  \___/|_| |_|\__,_|\__,_|\__|
14:54:44.097 [main] INFO  com.zaxxer.hikari.HikariDataSource - HikariPool-1 - Starting...
14:54:44.163 [main] INFO  com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Added connection org.mariadb.jdbc.Connection@20a14b55
14:54:44.165 [main] INFO  com.zaxxer.hikari.HikariDataSource - HikariPool-1 - Start completed.
May 17, 2024 2:54:44 PM com.github.shyiko.mysql.binlog.BinaryLogClient connect
INFO: Connected to localhost:3306 at mylog-bin.000004/342 (sid:65535, cid:43)
<==========---> 80% EXECUTING [10s]
> :run

The client

Let’s build a simple websocket client in node.js.

In the example below I am subscribing to the user table.

1
2
# create a node project and add 'ws' dependency
yarn init -y && yarn add ws

index.js

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
const WebSocket = require('ws');

const ws = new WebSocket('ws://localhost:8080/channel/UserNode');

ws.on('open', function open() {
  console.log('Connected to server');
  ws.send('user');//subscribe to user `table`
});

ws.on('message', function incoming(data) {
  //incoming data changes
  console.table(JSON.parse(data.toString("utf-8")).data);
});

ws.on('close', function close() {
  console.log('Disconnected from server');
});

Start the node project

1
node index.js

If the client is connected to our WebSocket server, you will see this message

1
2
15:00:47.585 [default-nioEventLoopGroup-1-2] INFO  realtimemariadb.WebSocketServer - User UserNode joined!
15:00:47.600 [default-nioEventLoopGroup-1-2] INFO  realtimemariadb.WebSocketServer - User UserNode subscribing on table user

Now try modifying your database.

realtime-data

Congrats, now you’ve built a realtime database service.

Source code