Intro This is part 2 of the “Realtime Data with Mariadb” Series, if you haven’t got the chance to read part 1 I highly recommend to read it first.
Websocket Now that we’ve successfully listened to database changes, we want to streamline it with WebSocket.
In this example I am using Micronaut’s websocket feature, you are obviously free to choose any framework or even implement it yourself.
As usual all the code in this article will be available here
Project Setup Let’s start by creating a micronaut project.
1 mn create-app realtimemariadb --features=yaml && cd realtimemariadb
We are going to introduce new dependencies, add these lines to your build.gradle
1 2 3 implementation ("org.mariadb.jdbc:mariadb-java-client:3.0.3" ) implementation ("org.springframework.boot:spring-boot-starter-jdbc:3.2.5" ) runtimeOnly("io.micronaut.sql:micronaut-jdbc-hikari" )
We need those dependencies to run queries to our database.
The next step is modifying our application.yml
add these lines for our database config.
1 2 3 4 5 6 7 8 9 datasources: default: url: jdbc:mariadb://localhost:3306/aschema driverClassName: org.mariadb.jdbc.Driver host: localhost schema: aschema username: test password: test dialect: MYSQL
Database Listener In the previous part, we created our database listener in Application.java
file, we are going to move it to a new File since we going to implement Micronaut’s ApplicationEventListener
class. This will run our listener after Micronaut has finished starting.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 package realtimemariadb;import com.github.shyiko.mysql.binlog.BinaryLogClient;import com.github.shyiko.mysql.binlog.event.Event;import com.github.shyiko.mysql.binlog.event.EventType;import com.github.shyiko.mysql.binlog.event.QueryEventData;import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;import io.micronaut.context.event.ApplicationEventListener;import io.micronaut.runtime.server.event.ServerStartupEvent;import jakarta.inject.Singleton;import net.sf.jsqlparser.JSQLParserException;import net.sf.jsqlparser.parser.CCJSqlParserUtil;import net.sf.jsqlparser.statement.Statement;import net.sf.jsqlparser.statement.delete.Delete;import net.sf.jsqlparser.statement.insert.Insert;import net.sf.jsqlparser.statement.update.Update;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.io.IOException;@Singleton public class DatabaseListener implements ApplicationEventListener <ServerStartupEvent> { private final MainChannel mainChannel; private final DataRepository repository; private final DatasourceConfig datasourceConfig; public DatabaseListener (MainChannel mainChannel, DataRepository repository, DatasourceConfig datasourceConfig) { this .mainChannel = mainChannel; this .repository = repository; this .datasourceConfig = datasourceConfig; } private static final Logger LOG = LoggerFactory.getLogger(DatabaseListener.class); @Override public void onApplicationEvent (ServerStartupEvent event) { listen(); } public void listen () { BinaryLogClient client = new BinaryLogClient ( datasourceConfig.getHost(), 3306 , datasourceConfig.getSchema(), datasourceConfig.getUsername(), datasourceConfig.getPassword()); EventDeserializer eventDeserializer = new EventDeserializer (); eventDeserializer.setCompatibilityMode( EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG, EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY); client.setEventDeserializer(eventDeserializer); client.registerEventListener(new BinaryLogClient .EventListener() { @Override public void onEvent (Event event) { if (event.getHeader().getEventType() == EventType.QUERY) { QueryEventData queryEventData = event.getData(); Statement statement = null ; try { statement = CCJSqlParserUtil.parse(queryEventData.getSql()); } catch (JSQLParserException e) { LOG.error(String.format("Error when parsing query event data : %s" , e.getMessage())); e.printStackTrace(); } String queryType = statement.getClass().getSimpleName().toUpperCase(); String tableName = null ; switch (queryType) { case "INSERT" : Insert insertStatement = (Insert) statement; tableName = insertStatement.getTable().getName(); break ; case "DELETE" : Delete deleteStatement = (Delete) statement; tableName = deleteStatement.getTable().getName(); break ; case "UPDATE" : Update updateStatement = (Update) statement; tableName = updateStatement.getTable().getName(); break ; default : System.out.println(String.format("unsupported query to watch %s" , queryType)); } DataEvent dataEvent = new DataEvent (tableName, repository.fetchData(tableName)); System.out.println(String.format("Data changes on table %s" , dataEvent.getData())); mainChannel.broadcast(dataEvent); } } }); try { client.connect(); } catch (IOException e) { LOG.error(e.getMessage()); } } }
Websocket Server We are going to make a very simple WebSocket server that listens to a connection open, and any messages that come to our service.
WebSocketServer.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 package realtimemariadb;import io.micronaut.websocket.WebSocketSession;import io.micronaut.websocket.annotation.OnClose;import io.micronaut.websocket.annotation.OnMessage;import io.micronaut.websocket.annotation.OnOpen;import io.micronaut.websocket.annotation.ServerWebSocket;import org.slf4j.Logger;import org.slf4j.LoggerFactory;@ServerWebSocket("/channel/{userName}") public class WebSocketServer { private final MainChannel mainChannel; private static final Logger LOG = LoggerFactory.getLogger(WebSocketServer.class); public WebSocketServer (MainChannel mainChannel) { this .mainChannel = mainChannel; } @OnOpen public void onOpen (String userName, WebSocketSession session) { String msg = String.format("User %s joined!" , userName); LOG.info(msg); } @OnMessage public void onMessage (String userName, String tableName, WebSocketSession session) { Subscription subscription = new Subscription (session, tableName); LOG.info(String.format("User %s subscribing on table %s" , userName, tableName)); mainChannel.subscribe(subscription); } @OnClose public void onClose (String userName, WebSocketSession session) { mainChannel.unsubscribe(session.getId()); } }
Then we are going to implement our channel, which will broadcast the changes to subscribed clients.
MainChannel.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 package realtimemariadb;import io.micronaut.websocket.WebSocketBroadcaster;import jakarta.inject.Singleton;import java.util.HashMap;import java.util.Map;import java.util.Set;import java.util.stream.Collectors;@Singleton public class MainChannel implements Channel { private final Map<String, Set<Subscription>> subscriptions = new HashMap <>(); public MainChannel () {} @Override public void subscribe (Subscription subscription) { Set<Subscription> ss = subscriptions.get(subscription.getTableName()); if (ss != null ) { ss.add(subscription); } else { subscriptions.put(subscription.getTableName(), Set.of(subscription)); } } @Override public void broadcast (DataEvent dataEvent) { if (dataEvent == null ) { return ; } Set<Subscription> subscribed = subscriptions.get(dataEvent.getTableName()); if (subscribed != null ) { subscribed.forEach(s -> { s.getWebsocketSession().sendSync(dataEvent); }); } } @Override public void unsubscribe (String sessionId) { subscriptions.forEach((key, value) -> { Set<Subscription> mValue = value.stream().filter( v -> v.getSessionId() != sessionId).collect(Collectors.toSet()); subscriptions.put(key, mValue); }); } }
Now try to run the code with
If everything is correct, you will see these messages
1 2 3 4 5 6 7 8 9 10 11 12 13 > Task :run __ __ _ _ | \/ (_) ___ _ __ ___ _ __ __ _ _ _| |_ | |\/| | |/ __| '__/ _ \| ' _ \ / _` | | | | __| | | | | | (__| | | (_) | | | | (_| | |_| | |_ |_| |_|_|\___|_| \___/|_| |_|\__,_|\__,_|\__| 14:54:44.097 [main] INFO com.zaxxer.hikari.HikariDataSource - HikariPool-1 - Starting... 14:54:44.163 [main] INFO com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Added connection org.mariadb.jdbc.Connection@20a14b55 14:54:44.165 [main] INFO com.zaxxer.hikari.HikariDataSource - HikariPool-1 - Start completed. May 17, 2024 2:54:44 PM com.github.shyiko.mysql.binlog.BinaryLogClient connect INFO: Connected to localhost:3306 at mylog-bin.000004/342 (sid:65535, cid:43) <==========---> 80% EXECUTING [10s] > :run
The client Let’s build a simple websocket client in node.js.
In the example below I am subscribing to the user table.
1 2 yarn init -y && yarn add ws
index.js
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 const WebSocket = require ('ws' );const ws = new WebSocket ('ws://localhost:8080/channel/UserNode' );ws.on ('open' , function open ( ) { console .log ('Connected to server' ); ws.send ('user' ); }); ws.on ('message' , function incoming (data ) { console .table (JSON .parse (data.toString ("utf-8" )).data ); }); ws.on ('close' , function close ( ) { console .log ('Disconnected from server' ); });
Start the node project
If the client is connected to our WebSocket server, you will see this message
1 2 15:00:47.585 [default-nioEventLoopGroup-1-2] INFO realtimemariadb.WebSocketServer - User UserNode joined! 15:00:47.600 [default-nioEventLoopGroup-1-2] INFO realtimemariadb.WebSocketServer - User UserNode subscribing on table user
Now try modifying your database.
Congrats, now you’ve built a realtime database service.
Source code