Data Ingest with Reactive Streams in Java

Reactive Stream

About The Program

Reading Lines from Source File

private Flux<Tuple2<Long, String>> indexedLines(final File file) {
return Flux.using(
() -> Files.lines(file.toPath()), // 1
lines -> Flux.fromStream(lines).index(), // 2
BaseStream::close // 3
);
}

Map to Contact Object

private LineIngestResult mapContact(final Tuple2<Long, String> indexedLine) {
return new LineIngestResult(
indexedLine.getT1() + 1,
ContactFile.mapRow(indexedLine.getT2().split("\\\\t"))
);
}

Validate Contact Object

private LineIngestResult validateContact(final LineIngestResult lineResult) {
final long lineNumber = lineResult.getLineNumber();
final Contact contact = lineResult.getContact();
return new LineIngestResult(
lineNumber,
contact,
ContactFile.validateContact(lineNumber, contact)
);
}

Insert into Database

private Flux<LineIngestResult> insertContact(final Flux<LineIngestResult> stream) {
return Flux.using(
this::initInsert, // 1
(resource) -> stream.map(
lineResult -> insertContact(
resource.getT2(),
lineResult)), // 2
this::closeInsert // 3
);
}
private Tuple2<Connection, PreparedStatement> initInsert() {
try {
final Connection connection = ContactDb.createConnection();
final PreparedStatement insertStmt = ContactDb.prepareInsertStatement(connection);
return Tuples.of(connection, insertStmt);
} catch (SQLException e) {
throw new RuntimeException(e.getMessage(), e);
}
}
private LineIngestResult insertContact(final PreparedStatement insertStmt, final LineIngestResult lineResult) {
if (lineResult.hasValidationError()) {
return lineResult;
} else {
try {
final int count = ContactDb.insertContact(insertStmt, lineResult.getContact());
return new LineIngestResult(
lineResult.getLineNumber(),
lineResult.getContact(),
lineResult.getValidationErrors(),
count > 0,
null);
} catch (SQLException e) {
throw new RuntimeException(e.getMessage(), e);
}
}
}
private void closeInsert(final Tuple2<Connection, PreparedStatement> resource) {
closeSilently(resource.getT2());
closeSilently(resource.getT1());
}
private void closeSilently(final AutoCloseable resource) {
try {
resource.close();
} catch (Exception ignored) {
}
}

Exception Handling

private Flux<LineIngestResult> handleException(final Flux<LineIngestResult> stream) {
return stream.onErrorResume(ex -> Flux.just(new LineIngestResult(ex)));
}

Putting It All Together

public Mono<FileIngestResult> ingestFile(final File file) {
return indexedLines(file) // 1
.skip(1) // Ignore the header line.
.map(this::mapContact) // 2
.map(this::validateContact) // 3
.transform(this::insertContact) // 4
.transform(this::handleException) // 5
.reduce(new FileIngestResult(file), (fileResult, lineResult) ->
fileResult.accumulate(
lineResult.getValidationErrors(),
lineResult.isInsertSucceed() != null && lineResult.isInsertSucceed(),
lineResult.getException()
)
); // 6
}

Summary

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store