A Practical Journey From Imperative to Functional in Java

Yongtze Chi
The Startup
Published in
12 min readSep 13, 2020

--

Photo by Fabian9799 on Unsplash

Functional style of programming is becoming more accepted in mainstream practical programming for good reasons. For one, functional programming offers a few tools to simplify complicated logic with more readable code.

Today, many mainstream programming languages, including Java, have incorporated these functional programming constructs. We don’t have to switch to a pure functional programming language in order to embrace functional style programming and benefit from it.

In this article, we’ll use a well understood application to illustrate how functional programming can be applied to the data processing domain; We are going to look at how we can ingest a tab-delimited file into the database, with some data validations and error reporting, using functional programming constructs in Java.

Full source code of examples in this article can be accessed here: https://gitlab.com/yongtze/functional-file-ingest

File Ingestion — The Imperative Way

To set the stage of what sort of code we are about to write, let’s look at the snippet of code below written in Java using the conventional imperative style.

private FileIngestResult ingestFile(final File file) {
long lineNumber = 0;
long totalRowsIngested = 0;
long totalErrorRows = 0;
List<ValidationError> allValidationErrors = new ArrayList<>();
try (
final BufferedReader reader = new BufferedReader(new FileReader(file));
final Connection connection = ContactDb.createConnection();
final PreparedStatement insertStmt = ContactDb.prepareInsertStatement(connection);
) {
String line = null;
while ((line = reader.readLine()) != null) {
lineNumber++;
if (lineNumber == 1) {
// Ignore the header line.
continue;
}

final String[] tuple = line.split("\\t");
final Contact contact = ContactFile.mapRow(tuple);
final List<ValidationError> validationErrors = ContactFile.validateContact(lineNumber, contact);

if (validationErrors.isEmpty()) {
final int insertCount = ContactDb.insertContact(insertStmt, contact);
totalRowsIngested += insertCount;
} else {
totalErrorRows++;
allValidationErrors.addAll(validationErrors);
}
}

return new FileIngestResult(
file,
FileIngestResult.Status.OK,
lineNumber-1,
totalRowsIngested,
totalErrorRows,
null,
allValidationErrors
);
} catch (Exception e) {
return new FileIngestResult(
file,
FileIngestResult.Status.ERROR,
lineNumber-1,
totalRowsIngested,
totalErrorRows,
e,
allValidationErrors
);
}
}

What the code is trying to achieve is read all lines from a tab-delimited file, convert each line into a Contact object, validate the Contact object, and insert into the contact table if validation passes. At the end of the code, we return a summary of the file ingest with total rows read from file, total rows ingested into the database, total rows with validation errors, all the validation errors, and any exception caught in the process.

Now, since this is a relatively simple program, the code organization isn’t that bad. We can still pick up on what the code is doing pretty easily. However, the point of looking at the imperative code is so that we have a baseline to compare to the functional version of this same code.

Java Stream API

Before we look at the first functional example, let’s take a quick recap of the Java Stream API in case you are new to it. For those who are familiar with the API, feel free to skip this section.

We can think of the Java Stream API as an API for processing a collection of data using a set of standard “operators”. The Java Stream API views the data as a stream of data that has a beginning and an end, with a chain of operators stringed together to form a sequence of data transformation steps. When the chain of operators is executed, each piece of data is passed through the chain of operators until the result is collected at the end of the chain.

To look at a more concrete example, let’s say we want to iterate over a List of String and print each String in the List to system out. Instead of coding this as a for loop, we use the forEach() operator.

List<String> names = Arrays.asList(“John”, “James”, “Adam”, “Mark”);
names.stream()
.forEach(name -> {
System.out.println(name);
});

The forEach() operator takes a closure or anonymous function which is called for each item in the stream. Notice we don’t need a for loop like in the imperative example above. With the Stream API, the control of the iteration is done by the API, we just supply closures that handle each item in the stream.

Let’s expand our simple code above using another operator (map) to convert the names to uppercase before printing them.

List<String> names = Arrays.asList(“John”, “James”, “Adam”, “Mark”);
names.stream()
.map(name -> name.toUpperCase())
.forEach(upperCaseName -> {
System.out.println(upperCaseName);
});

The map() operator takes a closure for which it will call for each item in the stream. Whatever value is returned from the closure is passed downstream to the next operator. Map is a commonly used operator for transforming data, for example, in our case above, to convert names to uppercase. Since forEach() comes after map(), it will get the uppercase names, instead of the original ones from the list.

What if, for whatever reason, we want to skip the first name in the stream? Easy, we just insert the skip() operator before map().

List<String> names = Arrays.asList(“John”, “James”, “Adam”, “Mark”);
names.stream()
.skip(1)
.map(name -> name.toUpperCase())
.forEach(upperCaseName -> {
System.out.println(upperCaseName);
});

The integer 1 we passed to skip() indicates how many items from the beginning of the stream we want to skip over. We can skip any number of items, not just 1.

What if we want to count the number of names, instead of printing them to system out? Turns out Java Stream API provides a few operators for aggregating data in the stream. For example, the count() operator will keep count of how many items are there in the stream at the point where the count() operator is inserted.

final List<String> names = Arrays.asList(“John”, “James”, “Adam”, “Mark”);
final int count = names.stream()
.skip(1)
.map(name -> name.toUpperCase())
.count();
System.out.println(“Count = “ + count);

So, just like that, we can chain any number of operators required to transform the stream of data from the original value to a target value we want.

File Ingestion Using Java Stream API

For our first functional file ingestion example, we’ll use the Java Stream API. Let’s build up the program one step at a time.

First, we need to read all the lines in the given file as a Stream of String. The JDK Files class provides a convenience function (Files.lines()) for doing this.

private Stream<Pair<Long, String>> indexedLine(final File file) throws IOException {
final AtomicLong lineNumber = new AtomicLong(0L);
return Files.lines(file.toPath())
.map(line -> Pair.of(lineNumber.incrementAndGet(), line));
}

First thing you’ll notice when we use the Stream API is we are relinquishing control of the iteration to the API. In other words, you won’t see a while loop in our code any more. Instead, we tell Stream API how do we generate the stream, how we process each data item in the stream, how do we want to collect the results, and Stream API controls how the iteration is executed.

An immediate consequence of this is now we need to find a different way to associate the correct line number with the line we just read from the file for error reporting purposes.

The way we solve it is by pairing an auto incremented Long integer with the line we just read from the file. And now we have a Pair of Long and String, which represents the line number and the line content.

This is a very common functional programming technique where instead of having temporary variables to keep track of “state”, we capture all relevant states using some data structure (in this case Pair) and emit that as the output so that subsequent steps have access to these states.

#1: In functional programming, we emit all relevant states as output, instead of using shared variables.

Next, we need to convert the line read from file into a Contact object. This is a simple mapping function, however, remember that we need to capture all relevant states as output? We will create a new class LineIngestResult that will have all the required properties for us to aggregate ingestion results at the end.

public class LineIngestResult {
private final long lineNumber;
private final Contact contact;
private final List<ValidationError> validationErrors;
private final Boolean insertSucceed;
private final Throwable exception;
// Constructors and getters omitted for brevity.
...
}

It’s worth noting here that since functional programming encourages the use of immutable data structure, the Contact class will have all properties marked as “final”, with values initialized through constructors only. There are getters as well to allow read access to these properties.

Why immutable data structures you may ask? Well, in short, immutable data structures eliminate possibilities that the properties will change after initialization, thereby making code easier to reason about. But how do we transform data then?

Well, we transform data by creating a new copy of the input object, with the relevant properties changed in the new instance. By adopting this strict convention, we don’t have to worry about unintended modification of object properties after creation in downstream code.

#2: Prefer usage of immutable data structures over mutable ones. Transform the input object to output object by copying and mutating at initialization of output object.

The following function “maps” a line of the file into a Contact object, and finally to a LineIngestResult object.

private LineIngestResult mapContact(final Pair<Long, String> indexedLine) {
final String[] tuple = indexedLine.getSecond().split(“\\t”);
final Contact contact = ContactFile.mapRow(tuple);
return new LineIngestResult(indexedLine.getFirst(), contact);
}

Next step, we need to validate the Contact object. The following function shows how we take the LineIngestResult object from the previous step, and transform it into a new LineIngestResult, with the validationErrors property initialized with the validation result.

private LineIngestResult validateContact(final LineIngestResult lineResult) {
final long lineNumber = lineResult.getLineNumber();
final Contact contact = lineResult.getContact();
return new LineIngestResult(lineNumber, contact, ContactFile.validateContact(lineNumber, contact));
}

Next, we need to insert the Contact object into our database, if it passes validation. Again, we are still using the same convention of taking in a LineIngestResult object, and produce a new one with the result of the insert execution. If validation did not pass, we simply return the previous LineIngestResult for aggregation downstream.

private LineIngestResult insertContact(final PreparedStatement insertStmt, final LineIngestResult lineResult) {
if (lineResult.hasValidationError()) {
return lineResult;
} else {
try {
ContactDb.insertContact(insertStmt, lineResult.getContact());
return new LineIngestResult(
lineResult.getLineNumber(),
lineResult.getContact(),
lineResult.getValidationErrors(),
true,
null);
} catch (SQLException e) {
throw new RuntimeException(e.getMessage(), e);
}
}
}

Now that we’ve got the individual steps coded, it’s time for the exciting part of glueing them together using the Java Stream API.

private FileIngestResult ingestFile(final File file) {
try (
final Connection connection = ContactDb.createConnection();
final PreparedStatement insertStmt = ContactDb.prepareInsertStatement(connection);
) {
return indexedLine(file) // (1)
.skip(1) // (2)
.map(indexedLine -> mapContact(indexedLine)) // (3)
.map(contact -> validateContact(contact)) // (4)
.map(lineResult ->
insertContact(insertStmt, lineResult)) // (5)
.reduce(new FileIngestResult(file),
(FileIngestResult fileResult, LineIngestResult lineResult) ->
fileResult.accumulate(
lineResult.getValidationErrors(),
lineResult.isInsertSucceed(),
lineResult.getException()),
(a, b) -> a); // (6)
} catch (Exception e) {
return new FileIngestResult(
file, FileIngestResult.Status.ERROR, 0, 0, 0, e, Collections.emptyList());
}
}

In a very quick overview of the code above, here’s what each of the steps is doing:

  1. Read each line from the specified file, paired with a line number.
  2. Skip the first (header) line.
  3. Map the line to a LineIngestResult containing a Contact object.
  4. Validate the Contact object.
  5. Insert the Contact object if validation passes.
  6. Aggregate all LineIngestResult in the stream into a single FileIngestResult.

When looking at code written in this style, it’s always helpful to be mindful of the input and output of each step or operator in the chain.

Input/output data types of each step in the operator chain

Note that step 3 to 5 takes a LineIngestResult object as input and generates a new LineIngestResult. Recall from the functions we looked at above, each of these steps fill in more and more properties of LineIngestResult as data progresses through the chain.

Finally, the last reduce() operator is something we haven’t looked at prior to this point. The reduce() operator is another collector or terminal operator like count(). Instead of simply counting items, reduce() allows us to aggregate all LineIngestResult objects in the stream into a single FileIngestResult object representing the summary of the entire ingestion process.

Reduce() Operator

Let’s dive a bit deeper into the reduce() step and see how it works. If you recall from our imperative code, we accumulate totalRows, totalRowsIngested, totalErrorRows, and validationErrors as we ingest each line to construct a single FileIngestResult at the end. We use temporary variables to hold on to the partial states throughout the iteration.

Following the convention #1 above of using functions that transform input to output, instead of temporary variables, we need a different way of aggregating results. Well, turns out, aggregation can be thought of as a series of function calls, with each call taking the current accumulated result and the current data item to produce the new accumulated result. This function is called the accumulator in Java Stream API’s reduce() operator.

The accumulator function in our example above is the second parameter in reduce() operator.

(FileIngestResult fileResult, LineIngestResult lineResult) ->
fileResult.accumulate(lineResult.getValidationErrors(), lineResult.isInsertSucceed(), lineResult.getException())

Most of the logic has been moved inside FileIngestResult.accumulate(). Let’s take a look.

public FileIngestResult accumulate(
final List<ValidationError> validationErrors,
final boolean isIngested,
final Throwable exception) {
final boolean isError = (validationErrors != null && !validationErrors.isEmpty()) || exception != null; final Status status = this.status == Status.ERROR || isError
? Status.ERROR
: Status.OK;
final List<ValidationError> newValidationErrors = new LinkedList<>(this.validationErrors);
if (validationErrors != null && !validationErrors.isEmpty()) {
newValidationErrors.addAll(validationErrors);
}
return new FileIngestResult(
this.file,
status,
this.totalRowsRead + 1,
isIngested ? this.totalRowsIngested + 1 : this.totalRowsIngested,
isError ? this.totalErrorRows + 1 : this.totalErrorRows,
this.exception == null ? exception : this.exception,
newValidationErrors
);
}

Remember, the accumulator function takes the current accumulated result (FileIngestResult), the current data item (LineIngestResult), and produces a new FileIngestResult representing the new accumulated result. FileIngestResult is simply a collection of all data points we want to aggregate, and each one is accumulated in this function.

  1. status — set to ERROR if either the current FileIngestResult is ERROR, or the current LineIngestResult has an error. Otherwise, it’s set to OK.
  2. totalRowsRead — current totalRowsRead + 1
  3. totalRowsIngested — if ERROR then totalRowsIngested; otherwise totalRowsIngested + 1
  4. totalErrorRows — if ERROR then totalErrorRows + 1; otherwise totalErrorRows
  5. exception — the first non-null value of (current exception, exception from LineIngestResult)
  6. validationErrors — current validationErrors + validationErrors from LineIngestResult

Now that we understand how the accumulator function works, let’s look at the reduce() operator again. More specifically here’s what each parameter of the reduce() operator mean.

reduce(new FileIngestResult(file),                          // (1)
(FileIngestResult fileResult, LineIngestResult lineResult) ->
fileResult.accumulate(lineResult.getValidationErrors(), lineResult.isInsertSucceed(), lineResult.getException()), // (2)
(a, b) -> a // (3)
);
  1. Initial value of the aggregation.
  2. The accumulator function that’s called for each data item in the stream.
  3. The third parameter is not relevant in our example because it’s only required for parallel processing, and we are doing sequential processing. Hence, we are giving a dummy function that simply returns the first parameter.

Composing Functions

In essence, what Stream API allows us to do is to compose a sequence of smaller data transformation functions into a bigger construct that can take external input (a file in this example), perform any required transformations (convert to domain object and validate), perform the desired side effects (data ingested into the database) and produce the output expected (FileIngestResult).

#3: Compose a sequence of smaller data transformation functions together to form a more complicated construct to transform data.

Summary

We looked at a few functional programming tools in our example to ingest data from a file into the database, using the Java Stream API. Java Stream API provides the Stream abstraction that is very useful in processing data. It allows us to combine many transformation steps in a chain of operators. Each step has a very defined purpose of transforming one data item into the next state.

I believe this makes the code more organized and easier to read overall, compared to the imperative version. However, the usage of Java Stream API in our example here is not perfect, as it does not capture all the behaviors of the imperative version of the program.

Java Stream API Shortcomings

Most notably, exception handling seems to be left out from the Stream API. We’d have to resort to our own ways of handling exceptions thrown in each step of the operator chain. The way we’ve chosen to solve this is simply converting checked exceptions to unchecked ones so that it can be caught outside of the operator chain. A side effect of handling the exception outside of the operator chain is that we can’t capture the states when the exception occurs for reporting purposes.

Secondly, any resource initializations required, e.g. database connections and prepared statements, cannot be captured by the operator chain. You’ll notice that getting a database connection and preparing the insert statement is still done outside of Stream API in the example. While this works, it would be ideal to be able to let the stream initialize resources in the beginning, and close them afterwards.

There is a solution to these two problems using another streaming library. However, that’s beyond the scope of this article, and perhaps we can look into that in a future article.

Full source code of examples in this article can be accessed here: https://gitlab.com/yongtze/functional-file-ingest

--

--