By tkruse


2019-06-10 15:59:09 8 Comments

There is the question on whether java methods should return Collections or Streams, in which Brian Goetz answers that even for finite sequences, Streams should usually be preferred.

But it seems to me that currently many operations on Streams that come from other places cannot be safely performed, and defensive code guards are not possible because Streams do not reveal if they are infinite or unordered.

If parallel was a problem to the operations I want to perform on a Stream(), I can call isParallel() to check or sequential to make sure computation is in parallel (if i remember to).

But if orderedness or finity(sizedness) was relevant to the safety of my program, I cannot write safeguards.

Assuming I consume a library implementing this fictitious interface:

public interface CoordinateServer {
    public Stream<Integer> coordinates();
    // example implementations:
    // IntStream.range(0, 100).boxed()   // finite, ordered, sequential
    // final AtomicInteger atomic = new AtomicInteger();
    // Stream.generate(() -> atomic2.incrementAndGet()) // infinite, unordered, sequential
    // Stream.generate(() -> atomic2.incrementAndGet()).parallel() // infinite, unordered, parallel
}

Then what operations can I safely call on this stream to write a correct algorithm?

It seems if I maybe want to do write the elements to a file as a side-effect, I need to be concerned about the stream being parallel:

// if stream is parallel, which order will be written to file?
coordinates().peek(i -> {writeToFile(i)}).count();
// how should I remember to always add sequential() in  such cases?

And also if it is parallel, based on what Threadpool is it parallel?

If I want to sort the stream (or other non-short-circuit operations), I somehow need to be cautious about it being infinite:

coordinates().sorted().limit(1000).collect(toList()); // will this terminate?
coordinates().allMatch(x -> x > 0); // will this terminate?

I can impose a limit before sorting, but which magic number should that be, if I expect a finite stream of unknown size?

Finally maybe I want to compute in parallel to save time and then collect the result:

// will result list maintain the same order as sequential?
coordinates().map(i -> complexLookup(i)).parallel().collect(toList());

But if the stream is not ordered (in that version of the library), then the result might become mangled due to the parallel processing. But how can I guard against this, other than not using parallel (which defeats the performance purpose)?

Collections are explicit about being finite or infinite, about having an order or not, and they do not carry the processing mode or threadpools with them. Those seem like valuable properties for APIs.

Additionally, Streams may sometimes need to be closed, but most commonly not. If I consume a stream from a method (of from a method parameter), should I generally call close?

Also, streams might already have been consumed, and it would be good to be able to handle that case gracefully, so it would be good to check if the stream has already been consumed;

I would wish for some code snippet that can be used to validate assumptions about a stream before processing it, like>

Stream<X> stream = fooLibrary.getStream();
Stream<X> safeStream = StreamPreconditions(
    stream, 
    /*maxThreshold or elements before IllegalArgumentException*/
    10_000,
    /* fail with IllegalArgumentException if not ordered */
    true
    )

1 comments

@orirab 2019-06-26 09:28:14

After looking at things a bit (some experimentation and here) as far as I see, there is no way to know definitely whether a stream is finite or not.

More than that, sometimes even it is not determined except at runtime (such as in java 11 - IntStream.generate(() -> 1).takeWhile(x -> externalCondition(x))).

What you can do is:

  1. You can find out with certainty if it is finite, in a few ways (notice that receiving false on these does not mean it is infinite, only that it may be so):

    1. stream.spliterator().getExactSizeIfKnown() - if this has an known exact size, it is finite, otherwise it will return -1.

    2. stream.spliterator().hasCharacteristics(Spliterator.SIZED) - if it is SIZED will return true.

  2. You can safe-guard yourself, by assuming the worst (depends on your case).

    1. stream.sequential()/stream.parallel() - explicitly set your preferred consumption type.
    2. With potentially infinite stream, assume your worst case on each scenario.

      1. For example assume you want listen to a stream of tweets until you find one by Venkat - it is a potentially infinite operation, but you'd like to wait until such a tweet is found. So in this case, simply go for stream.filter(tweet -> isByVenkat(tweet)).findAny() - it will iterate until such a tweet comes along (or forever).
      2. A different scenario, and probably the more common one, is wanting to do something on all the elements, or only to try a certain amount of time (similar to timeout). For this, I'd recommend always calling stream.limit(x) before calling your operation (collect or allMatch or similar) where x is the amount of tries you're willing to tolerate.

After all this, I'll just mention that I think returning a stream is generally not a good idea, and I'd try to avoid it unless there are large benefits.

@tkruse 2019-06-26 10:01:33

.splititerator() is a method I think, not a public field. Also you can copy the checking of SIZED for ORDERED, I guess? I think it should be possible to have a counter for the elements as the stream is processed, so that even for potentially infinitestreams, it can throw an exception if more elements are emitted than I maximally expected (at performance cost of course). Else nice answer so far.

@orirab 2019-06-26 10:09:23

spliterator - correct. ordered - the problem is that it can be ordered only if it is finite, otherwise it will take forever (for example, Stream.generate(random::nextInt).sorted() will cause an intellij warning), so checking for ordered is a bit redundant. Instead of keeping a counter and inc. it yourself, why not use limit(x) as maximum?

@tkruse 2019-06-26 12:16:41

Limit does not tell you that there were more. Eg calling Max on a very long stream, that might be Infinite, safer to throw exception than to return wrong number.

@orirab 2019-06-26 12:22:23

I'm not too sure of that - it very much depends on your use-case, but I see your point.

@orirab 2019-06-26 21:02:51

If this is a fitting answer, would you consider accepting it?

Related Questions

Sponsored Content

42 Answered Questions

[SOLVED] How do I convert a String to an int in Java?

24 Answered Questions

[SOLVED] How do I declare and initialize an array in Java?

  • 2009-07-29 14:22:27
  • bestattendance
  • 4259551 View
  • 1910 Score
  • 24 Answer
  • Tags:   java arrays declare

34 Answered Questions

[SOLVED] How to split a string in Java

  • 2010-08-14 03:01:53
  • riyana
  • 3656447 View
  • 1533 Score
  • 34 Answer
  • Tags:   java string

58 Answered Questions

[SOLVED] How do I read / convert an InputStream into a String in Java?

21 Answered Questions

[SOLVED] How do I call one constructor from another in Java?

  • 2008-11-12 20:10:19
  • ashokgelal
  • 789291 View
  • 2103 Score
  • 21 Answer
  • Tags:   java constructor

40 Answered Questions

[SOLVED] How do I efficiently iterate over each entry in a Java Map?

26 Answered Questions

[SOLVED] How do I determine whether an array contains a particular value in Java?

  • 2009-07-15 00:03:21
  • Mike Sickler
  • 1852949 View
  • 2156 Score
  • 26 Answer
  • Tags:   java arrays

65 Answered Questions

[SOLVED] How do I generate random integers within a specific range in Java?

  • 2008-12-12 18:20:57
  • user42155
  • 3854993 View
  • 3307 Score
  • 65 Answer
  • Tags:   java random integer

26 Answered Questions

[SOLVED] How to get an enum value from a string value in Java?

  • 2009-03-02 22:56:34
  • Malachi
  • 1033610 View
  • 1861 Score
  • 26 Answer
  • Tags:   java enums

9 Answered Questions

[SOLVED] How to Convert a Java 8 Stream to an Array?

Sponsored Content