Intro

Recently I needed to create an app using Micronaut’s Reactor for my work, fundamentally it makes your JVM application run in an unblocking manner and thus increases the throughput.

The project needs to do database transactions and since documentation does not contain the integration with Micronaut’s reactor core library, I am writing this post.

If you want to check the documentation from the micronaut page you can check this link.

The code

I am writing this in Groovy, but it will be similar if you write it in Java or other JVM languages.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package transactionvertx

import io.micronaut.context.event.ApplicationEventListener
import io.micronaut.runtime.server.event.ServerStartupEvent
import io.vertx.reactivex.mysqlclient.MySQLPool
import io.vertx.reactivex.sqlclient.SqlClient
import io.vertx.reactivex.sqlclient.SqlConnection
import io.vertx.reactivex.sqlclient.Transaction
import jakarta.inject.Singleton
import reactor.adapter.rxjava.RxJava2Adapter
import reactor.core.publisher.Mono

import java.time.Clock
import java.time.Instant
import java.time.LocalDateTime

@Singleton
class MainClass implements ApplicationEventListener<ServerStartupEvent> {

private final MySQLPool pool

MainClass(MySQLPool pool) {
this.pool = pool
}

@Override
void onApplicationEvent(ServerStartupEvent event) {
pool.rxGetConnection().flatMap { c ->
return c.rxBegin().flatMap { trx ->
return c.query("INSERT INTO user(username, password) VALUES('username_1', 'passsword1')").rxExecute()
.flatMap { return c.query("INSERT INTO user(username, password) VALUES('username_2', 'passsword2')").rxExecute() }
.flatMap {
throw new ("Artificial error")
return c.query("INSERT INTO user(username, password) VALUES('username_3', 'passsword3')").rxExecute()
}
.flatMap { return c.query("INSERT INTO user(username, password) VALUES('username_4', 'passsword3')").rxExecute() }
.flatMap { return c.query("SELECT * FROM user").rxExecute() }
.doOnError {
trx.rollback()
}
.doOnSuccess { trx.commit() }
.doFinally { c.close() }
}
}.subscribe({
it.each { println(it.toString()) }
}, { println(it.message) })
}
}

Breakdown

1
2
3
pool.rxGetConnection().flatMap { c ->
    return c.rxBegin()
}

We are starting by getting a connection from the pool, if you already have the connection you can use that instead. By calling .flatMap I am using the connection and returning the non-blocking (another Observable) function call which is c.rxBegin() to begin the transaction.

1
return c.rxBegin().flatMap { trx -> }

Now we have a Transaction object from the connection, we are going to use this to control our transaction.

We start our transaction by calling c.rxBegin().

1
c.query("INSERT INTO my_user(name, email) VALUES('John John 1', 'mayerjohn1@mail.com')").rxExecute()

Using the connection we got earlier, do the query normally and call rxExecute() to return an Observable version of the query.

1
2
3
.doOnError { trx.rollback() }
.doOnSuccess { trx.commit() }
.doFinally { c.close() }

Then finish the transaction by calling rollback or commit depending on what happened in the transaction, one important note is always close() your connection when the transaction is finished to return it to the pool.

full code here.