Data Ingest with Reactive Streams in Java

Yongtze Chi
7 min readFeb 14, 2021

In an earlier post, we looked at how to turn an imperative file ingest Java program into one that uses Java Stream API. To recap, the goal of the example program is to read a bunch of contacts from a flat file, validate the data, and finally insert all validated contacts into the database.

We highlighted a few shortcomings with the program we wrote in the end, more specifically the less than ideal exception and resource handling mechanism.

In this post, I’d like to address specifically these two problems with a better code using the Reactive Stream library, specifically the Reactor implementation.

Reactive Stream

By now, I’m sure most of us have heard of Reactive Stream, and maybe even used one of its many implementations. If you have not heard of it, here’s a good description of it from reactive-streams.org:

Reactive Streams is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure. This encompasses efforts aimed at runtime environments (JVM and JavaScript) as well as network protocols.

The programming API of reactive streams is very similar to Java Stream API. If you are already familiar with Java Stream API, you will be able to start using Reactive Streams API pretty quickly. Although mastering it, like any other programming skills, will take time.

As we have alluded to earlier, Reactive Streams API offers much richer functionality with more depth than Java Stream API. More specifically, the few features we will highlight in this article are:

  1. Resource initialization and cleanup.
  2. Exception handling operators.
  3. Non-blocking asynchronous processing with back pressure.

By the way, we’ll be using the Reactor library for our examples. The Reactor API provides 2 primary interfaces we work with: Mono and Flux. Mono is a specialized stream of at most 1 item; and Flux represents a more generic stream of any length (closer in concept to Stream in Java Stream API). You’ll see both interfaces being used in our sample program.

About The Program

The program we are going to write will do the following steps:

  1. Read lines from a flat file, convert each line into a Contact object.
  2. Validate each Contact object to make sure they meet certain requirements.
  3. For each Contact object that’s validated, insert its values into the contacts table.
  4. Summarizes the results of the validation and database write, and return the result as a FileIngestResult object.

Reading Lines from Source File

Let’s start with reading lines from the source file as a stream. What we are trying to achieve is to initialize the resource we need when the stream starts, and closes it when the stream is done or an exception is encountered.

In Reactor API, this is done using the Flux.using() operator. All we need to do is provide a resource supplier closure, a data stream supplier closure, and a resource cleanup closure.

private Flux<Tuple2<Long, String>> indexedLines(final File file) {
return Flux.using(
() -> Files.lines(file.toPath()), // 1
lines -> Flux.fromStream(lines).index(), // 2
BaseStream::close // 3
);
}
  1. Open the line stream using Java NIO Files.lines() API.
  2. Convert the Java line Stream into a Reactive Stream of index and line content pairs.
  3. Close the Java line Stream on stream termination (normally or with exception).

For those of you who have a keen eye, a natural question you might be wondering is, why can’t we just convert the Java line Stream into a Reactive Stream directly and return that?

Good question. The reason has to do with a subtle difference between Java Stream and Reactive Stream:

Java Stream is meant to be used at most once, while Reactive Stream can be subscribed to multiple times.

What does this mean? Well, first it means that if we are sourcing data from an IO resource such as a file, we need to open the file every time the stream is consumed, and close it afterwards. More specifically to our code above, it means we have to open the line stream upon consumption of stream, and close it after the stream is terminated.

Had we just convert the line stream to Reactive Stream type, the stream will work the first time we consume from it, but as soon as we consumed it the second time, it will error out.

One could argue that for our purpose, we don’t need to consume from the stream multiple times. That’s true, but it’s important to learn the difference early on. If for some reason, we choose to not make the stream reusable, at least that is a conscious choice, and we know about the limitation.

Map to Contact Object

This step converts the line into a Contact object. Pretty straightforward.

private LineIngestResult mapContact(final Tuple2<Long, String> indexedLine) {
return new LineIngestResult(
indexedLine.getT1() + 1,
ContactFile.mapRow(indexedLine.getT2().split("\\\\t"))
);
}

Validate Contact Object

This step validates the Contact object according to some basic rules. It returns a LineIngestResult that’s partially populated with the input line number, Contact object, and the validation results.

private LineIngestResult validateContact(final LineIngestResult lineResult) {
final long lineNumber = lineResult.getLineNumber();
final Contact contact = lineResult.getContact();
return new LineIngestResult(
lineNumber,
contact,
ContactFile.validateContact(lineNumber, contact)
);
}

Insert into Database

Inserting into the database requires use of resources that need to be initialized and closed. So, again, we’ll be using the Flux.using() operator to handle this.

private Flux<LineIngestResult> insertContact(final Flux<LineIngestResult> stream) {
return Flux.using(
this::initInsert, // 1
(resource) -> stream.map(
lineResult -> insertContact(
resource.getT2(),
lineResult)), // 2
this::closeInsert // 3
);
}
  1. Call the initInsert() function to open a database connection, and prepare the insert statement for use later on.
  2. Builds a Flux using the resource returned from initInsert(). We take the input stream, map each LineIngestResult to include the database insert result by calling insertContact() function with prepared insert statement and LineIngestResult from the input stream.
  3. Close the resource when stream is terminated.

Here’s the code for initInsert() function:

private Tuple2<Connection, PreparedStatement> initInsert() {
try {
final Connection connection = ContactDb.createConnection();
final PreparedStatement insertStmt = ContactDb.prepareInsertStatement(connection);
return Tuples.of(connection, insertStmt);
} catch (SQLException e) {
throw new RuntimeException(e.getMessage(), e);
}
}

It basically connects to the database, prepare the insert statement, and return both the connection and prepared statement as a Tuple2 object. Any SQLException is re-thrown as a RuntimeException which will be handled by our handler described in next section.

To actually insert the Contact object, we simply bind the values to the prepared statement, and execute it.

private LineIngestResult insertContact(final PreparedStatement insertStmt, final LineIngestResult lineResult) {
if (lineResult.hasValidationError()) {
return lineResult;
} else {
try {
final int count = ContactDb.insertContact(insertStmt, lineResult.getContact());
return new LineIngestResult(
lineResult.getLineNumber(),
lineResult.getContact(),
lineResult.getValidationErrors(),
count > 0,
null);
} catch (SQLException e) {
throw new RuntimeException(e.getMessage(), e);
}
}
}

Note that we don’t execute the insert unless the Contact object is validated. And again, we re-throw SQLException as RuntimeException to be handled downstream.

Finally, to close the connection and prepared statement upon stream termination:

private void closeInsert(final Tuple2<Connection, PreparedStatement> resource) {
closeSilently(resource.getT2());
closeSilently(resource.getT1());
}
private void closeSilently(final AutoCloseable resource) {
try {
resource.close();
} catch (Exception ignored) {
}
}

Exception Handling

Recall from the previous post that Java Stream API does not provide native support for handling exception. It is up to us to decide how to handle any exceptions that might occur throughout the chain of operators.

Reactive Stream, more specifically the Reactor API offers a few handy operators to deal with exceptions that can happen at any stage of the chain.

  1. Emit an error: error()
  2. Emit an error after waiting for a certain duration: timeout()
  3. Catch an exception and turn into a default value: onErrorReturn()
  4. Catch an exception and fallback to another stream: onErrorResume()
  5. Catch an exception and convert to another exception: onErrorMap()
  6. Execute some code on normal termination or exception: doFinally()
  7. Retrying operation on exception: retry()
  8. Retrying operation with a back-off strategy: retryWhen(Retry.backoff(…))

That’s just a some of the operators available, and will be adequate to handle most exception scenarios.

Looking back at our file ingestion sample again, what we want to do when there’s a IO exception is to capture the exception and terminate the stream. In other words, we want to stop on first exception encountered, and report on it. We will use the onErrorResume() operator to achieve this effect.

private Flux<LineIngestResult> handleException(final Flux<LineIngestResult> stream) {
return stream.onErrorResume(ex -> Flux.just(new LineIngestResult(ex)));
}

What onErrorResume() does is whenever an error or exception is encountered, it will stop the original stream, and substitute it with the alternate stream. In this case, the alternate stream is a single LineIngestResult containing the exception. Since there’s only a single item, the stream terminates after that item is consumed.

Putting It All Together

Now that we have the pieces coded, we can glue them together using Reactor API’s operators.

public Mono<FileIngestResult> ingestFile(final File file) {
return indexedLines(file) // 1
.skip(1) // Ignore the header line.
.map(this::mapContact) // 2
.map(this::validateContact) // 3
.transform(this::insertContact) // 4
.transform(this::handleException) // 5
.reduce(new FileIngestResult(file), (fileResult, lineResult) ->
fileResult.accumulate(
lineResult.getValidationErrors(),
lineResult.isInsertSucceed() != null && lineResult.isInsertSucceed(),
lineResult.getException()
)
); // 6
}
  1. Read lines from the source file, with line numbers.
  2. Map the line into a Contact object.
  3. Validate the Contact object.
  4. Insert a validated Contact object into the database.
  5. Handle any exception thrown in steps before.
  6. Aggregate all LineIngestResult into a single FileIngestResult.

Notice that the ingestFile() function returns a Mono<FileIngestResult>. As you might recall, Mono is a stream of at most one data item. This means there’s only ever going to be at most one FileIngestResult representing the summary of ingestion result for the given file.

Summary

In this article, we looked at how to apply the Reactor API to ingest data from a flat file into the database. The style of the code looks very similar to the previous example of using Java Stream API, except with Reactor, we are able to handle resource and exceptions much more elegantly in the stream.

The complete source code of samples in this article can be accessed here: https://gitlab.com/yongtze/functional-file-ingest

--

--