Java Stream

What is stream?

Stream is an abstraction of data operations. It takes input from the collections, Arrays of I/O channels.

From imperative to declarative

For example, given a list of people, find out the first 10 people with age less or equals to 18.

The following solution is the imperative approach:

public void imperativeApproach() throws IOException {
        List<Person&gt; people = MockData.getPeople();

        List<Person&gt; peopleAbove18 = new ArrayList<&gt;();
        for (Person person : people) {
            if (person.getAge() <= 18) {
                peopleAbove18.add(person);
            }
        }

        for (Person person: peopleAbove18) {
            System.out.print(person);
        }
}


The following is the declare approach style:

public void declareApproach() throws IOException {
        List<Person&gt; people = MockData.getPeople();
       people.stream()
                    // a lambda function as a filter
                  .filter(person -&gt; person.getAge() <= 18)
                  .limit(10)
                  .collect(Collectors.toList())
                  .forEach(System.out::print);
}

Abstraction

We mentioned that stream is an abstraction to the data manipulation. It abstract them in the following way:

  • Concrete: can be the Sets, Lists, Maps, etc
  • Stream: can be filter, map, etc.
  • Concrete: collect the data to make it concrete again.

Intermediate and Terminate Operation

Java stream has different operation units:

  • Intermediate operators: map, filter, sorted
  • Terminators: collect, foreach, reduce

Each intermediate operation is lazily executed and return a stream, until a terminal operation is met.

Range

With IntStream.range(), you can create a stream with fixed set of elements, for example:

    public void rangeIteratingLists() throws Exception {
        List<Person&gt; people = MockData.getPeople();

        // Use int stream to loop through the list and print the object.
        IntStream.range(0, 10).forEach(i -&gt; System.out.println(people.get(i)));

        // If you want to use for the first elements
        people.stream().limit(10).forEach(System.out::println);
    }

You can also iterate the function for the given number times:

    public void intStreamIterate() throws Exception {
        // This is very much like the fold function on Kotlin,
        // that it keep iterating based on the iterator you provided.
        IntStream.iterate(0, operand -> operand + 1).limit(10).forEach(System.out::println);
    }

Max, Min and Comparators

Java stream provides built in Min/Max function that support customized comparators. For example:

    public void min() throws Exception {
        final List<Integer&gt; numbers = ImmutableList.of(1, 2, 3, 100, 23, 93, 99);

        int min = numbers.stream().min(Comparator.naturalOrder()).get();

        System.out.println(min);
    }


Distinct

Sometimes, we would like to get the distinct elements from the stream, then we could use the distinct api of the stream

  public void distinct() throws Exception {
    final List<Integer&gt; numbers = ImmutableList.of(1, 1, 2, 2, 3, 3, 4, 4, 5, 5, 6, 6, 7, 7, 8, 8, 9, 9, 9, 9, 9);

    List<Integer&gt; distinctNumbers = numbers.stream()
        .distinct()
        .collect(Collectors.toList());

  }


Filtering and Transformation

Stream filter api enables you to filter the content of the element, for example:

    public void understandingFilter() throws Exception {
        ImmutableList<Car&gt; cars = MockData.getCars();

        // Predicate is an assertion that returns true or false
        final Predicate<Car&gt; carPredicate = car -&gt; car.getPrice() < 20000;

        List<Car&gt; carsFiltered = cars.stream()
            .filter(carPredicate)
            .collect(Collectors.toList());

And map API enable you to transform the format of the element, for example, we could define a another object and transform the given stream to the targeted stream:

    public void ourFirstMapping() throws Exception {
        // transform from one data type to another
        List<Person&gt; people = MockData.getPeople();

        people.stream().map(p -&gt; {
            return new PersonDTO(p.getId(), p.getFirstName(), p.getAge());
        }).collect(Collectors.toList());

    }

Group Data

One common function in SQL queries are data grouping, for example:

SELECT COUNT(*), TYPE FROM JOB WHERE USER_ID = 123 GROUP BY TYPE

Java stream provides similar functionalities:

  public void groupingAndCounting() throws Exception {
    ArrayList<String&gt; names = Lists
        .newArrayList(
            &quot;John&quot;,
            &quot;John&quot;,
            &quot;Mariam&quot;,
            &quot;Alex&quot;,
            &quot;Mohammado&quot;,
            &quot;Mohammado&quot;,
            &quot;Vincent&quot;,
            &quot;Alex&quot;,
            &quot;Alex&quot;
        );

    Map<String, Long&gt; counting = names.stream()
        .collect(Collectors.groupingBy(Function.identity(), Collectors.counting()));

    counting.forEach((name, count) -&gt; System.out.println(name + &quot; &gt; &quot; + count));
  }


Reduce and Flatmap

Very similar to the Hadoop Map/Reduce job, where map take care of transformation of the data, while the reduce job collect the data and do the final computation. For example:

  public void reduce() throws Exception {
    Integer[] integers = {1, 2, 3, 4, 99, 100, 121, 1302, 199};

     // Compute the same of the elements, with the initial element as a
    int sum = Arrays.stream(integers).reduce(0, (a, b) -> a + b);
    System.out.println(sum);

    // use the function reference
    int sum2 = Arrays.stream(integers).reduce(0, Integer::sum);
    System.out.println(sum2);

  }


Flat map is different from the map function that it could flat the internal structure first.

For example:

List<List<String&gt;&gt; list = Arrays.asList(
  Arrays.asList(&quot;a&quot;),
  Arrays.asList(&quot;b&quot;));
System.out.println(list);

System.out.println(list
  .stream()
  .flatMap(Collection::stream)
  .collect(Collectors.toList()));


The result of the stream is a String List.

消费函数

当我们考虑花多少钱在消费上的时候,我们会考虑哪些因素?也就是说,我们的消费由什么决定?在经济学中,又该如何量化这个决策的过程?凯恩斯提出的消费函数认为:消费仅仅取决于现期收入,也就是 C=C0 + aY`。这里的Y是收入,而C0 是当前消费,而a是边际消费率,也就是随着收入增长,消费增长的比例。凯恩斯的消费公式有几个重要含义:

  • 消费随着收入的增长而增长,但是增长的比例小于1,即边际消费率大于0但是小于 1。
  • 边际消费率本身也会随着收入的增长而减小。

这种消费函数的理论在关于短期内消费倾向随收入变化的研究中得到了验证。根据凯恩斯的理论,随着收入的提高,由于新增收入部分消费比例的而降低,消费占收入整体的比例是减小的。这样就带来了一种预测中的困境:最终可用于消费的资金越来越少。问题在于,这种预测中的困境并未变成现实,而对长期消费倾向的研究显示,长期消费倾向不随收入而变化,也就是说,凯恩斯的公式在长期不适用了。

许多经济学家对这个问题进行了研究。费雪的研究现实人们在决定消费的时候并不只会考虑当前的收入,人们会考虑未来的收入,并会通过储蓄和贷款等方式将长期的消费平滑。比如在对未来收入稳定或者增长的情况下通过贷款来提前消费,或者在对退休后收入下降的预期之下通过储蓄来增加后期的消费。也就是说,人们会通过理性的调整自己的消费和储蓄的比例来尽力使得长期消费平滑。

费雪模型中引入了利率的影响。在进行跨期选择的时候,由于利息的作用,未来收入的价值要小于当前收入的价值。为了对现期与未来的消费组合进行比较,费雪的模型还引入了无差异曲线,在这个曲线上的消费组合对于消费者所产生的满意度是一样的。收入的增加会提升无差异曲线从而带来现期消费和未来消费的同时增多。于此同时,消费者受到跨期预算的约束,现期的收入必须减去为未来的储蓄才能作为消费,未来的消费包括未来的储蓄和未来的收入。现期和未来的消费在引入利息之后等于现期和未来的收入。

在费雪理论的基础上,弗朗科提出了生命周期假说。在费雪的模型中,消费取决于一个人一生的收入,弗朗科进一步强调人在收入在人们的一生中系统地变动,而人们通过储蓄把收入从一生中的高收入时期转移到低收入时期。弗朗科的模型在凯恩斯的模型之上引入了个人的财富W。消费者的总资源包括其初始的财富和一生中的收入,然后平均分配到未来的若干年中就可以得到平均的消费函数:C=aW+bY,其中a为财富的边际消费倾向,Y为收入的边际消费倾向,而平均消费倾向就变成了:C/Y = a(W/Y) + b。因此当我们观察不同个人或短期数据的时候,因为财富短期不变,高收入带来较低的平均消费倾向。但在长期,由于财富的增加,消费函数会向上移动,从而阻止了平均消费倾向随着收入的增加而下降。

弗里德曼提出了另一种理论来说明长期消费函数。他假设,我们的现期收入可以分为两个部分:永久收入和暂时收入。永久收入是一生中的平均收入,暂时收入是在这个平均值附近的随机偏离。比如更高的教育水平可以带来更高的平均收入,运气等原因会带来不同的暂时收入,弗里德曼的结论是,消费函数可以近似的看成:C=aYp ,其中a为常熟,它衡量永久收入中用于消费的比例。永久收入的假说认为,弗里德曼消费函数使用了错误的变量,而平均消费倾向取决于永久收入与现期收入的比例。当现期收入短暂上升到永久收入一下的时候,平均消费倾向暂时下降,反之则会上升。

经济学中关于消费函数的演进过程使我想起物理学定律被不断被修正的过程:初始提出的模型或者假设被发现不能使用与新的领域,于是新的模型被提出,更有想要常识使用同一个模型解释多重不同的情况,比如宏观和微观,经典和量子。经济学建模的过程与此相似,都是在不断找出真正影响结果的因素,修正所使用的模型的过程。在经济学的学习中我也逐渐发现数学的重要性,为了可以进行量化讨论和研究,数学模型是必不可少的。s

Lambda Expression in Java/Kotlin

Higher-order function

In computer science, a higher-order function is a function that does at least one of the following:

  • Takes one or more functions as arguments
  • Returns a function as its result.

All other functions are first-order functions.

Anonymous class

In Java, the anonymous class enables you to declare and instantiate a class at the same time. If you only need to use a local class once, then you should use the anonymous class. For example, if you wish to define a runnable class to execute a task:

Executors.NewSingleThreadExecutor().execute(new Runnable() {
    @Override
    public void run() {
        // Your task execution.
    }
})


As you can see on the above example, the Runnable is a interface with one function run defined. The anonymous class implemented the interface.

Lambda Expression

Besides the anonymous class, Java also supports anonymous functions, named Lambda Expression.

While anonymous class enables you to declare the new class in a statement, it is sometimes not concise enough when there is only one function in the class.

For the example on the above section, we could simplify it’s implementation with a Lambda expression.

Executors.NewSingleThreadExecutor().execute(()-> {// Your task execution })


The lambda expression provides a few functionalities:

  • Enable to treat functionality as a method argument, or code as data
  • A function that can be created without belonging to any class.
  • A lambda expression can be passed around as if was an object and executed on demand.

In the mean time, the functions are first-class in Kotlin. They can be stored in variables and data structures, passed as arguments and returned from other higher-order functions.

The kotlin Lambda expression follows the following syntax:

  • It is always surrounded by curly braces,
  • Parameter declarations in full syntactic from go inside curly braces and have optional type annotations.
  • The body goes after an -> sign.
  • If the expression return type is not Unit, the last expression inside the body is treated as the return type.

As you can tell, the Lambda expression can’t specify the return types. If you wish the define the return type, you could use an alternative solution: anonymous function.

fun(x: Int, y: Int): Int = x + y


The major difference between Kotlin and Java is that Kotlin is a functional programming language. Kotlin has a dedicated type for functions, for example:

val initFunction: (Int) -> Int

The above expression means that the initFunctions is a function type, and the function takes in a integer and return a integer.

The above function be rewrite as:

val a = {i: Int -> i +1}

SRE: Data Integrity

 

Data integrity usually refers to the accuracy and consistency of data throughout its lifetime. For customer involved online services, things can go even more complex. Any data corruption, data loss, or extended unavailability are considered data integrity issue for the customer.

Data integrity in many cases can be a big problem. For instance, the database table was corrupted and we had to spend a few hours restore the data from the snapshot database. In another instance, the data was accidentally deleted and had a fatal impact on our client, as the client never expected the data to be unavailable. However, it was too expensive to restore the data, so we had to fix the dependent data record and some code on the client side to mitigate the impact on the clients. There is another instance that the data loaded for the client is not what they expected. This is clearly a data consistency issue. However, the issue was not reproducible and thus made it super hard for the team to debug.

There are many types of failure that could lead to the data integrity issue:

  • Root cause

User actions, Operator Error, Application Bug, Infrastructure defect, Hardware Failure, Site Disaster

  • Scope

Wide, Narrow, directed

  • Rate

Big Bang, Slow and Steady

This leads to the 24 combinations of the data integrity issue. How do we handle such issues?

First layer action is to adopt soft delete to the client data. The idea behind soft delete is to make sure that the data is recoverable if needed, for example, from operation errors. A soft delete is usually implemented through adding a is_delete flag and a deleted_at time stamp to the table. When data is to be deleted, they are not deconstructed from the database immediately, but will be marked as deleted with a scheduled deleted time in the future, say 60 days from the deletion. In this way, the data deletion could be reverted if necessary.

There are different opinions about the soft deletion solution, as it might introduce extra complexity on the data management. For example, when there are hierarchies and dependency relationship between the data records, the deletion might break the data constraints. In the meantime, it makes the data selection and option more complex, as a customized filter has to be applied to the data in order to filter out the data that has been soft deleted. And recovering the soft delete data can also be complex especially only part of the data is deleted, a recovery might involve complex data merge.

The second layer action is to build the data backup system and make the recovery process fast. We need to be more careful here that the data backup or archive is not the purpose of data integrity. Find out ways to prevent the data loss, to detect data corruption, to quickly recover from data integrity instance is more important. Data backup is often times neglected as it yields no visible benefit and not a high priority for anyone. But building a restoring system is a much more useful goal.

For many cloud services, data backup is an option, for example, AWS RDS supports creating data snapshot, while the cloud cache Redis cluster supports backup the data on the EBS storage. Many people stop here as they assume that the data is currently back up. However, we should realize that the data recovery could take a long time to finish, and the data integrity is broken during the recovery time. The recovery time should be an important metric for the system.

Besides back up, many systems use replicas. And by failover to the replica when the primary node had an issue, they could improve the system availability. We need to realize that the data might not be consistent between the primary instance and the replica instance.

A third layer is to detect the error earlier. For example, have a data validation job that validates the integrity of the data between different storage systems so that the issue could be fixed quickly when it happens.