RxJava – Observable.interval()

package rxjava;
 
import java.util.Random;
import java.util.concurrent.TimeUnit;
 
import rx.Observable;
import rx.Observer;
import rx.functions.Func1;
import rx.schedulers.Schedulers;
 
public class RxInterval {
 
	private final Random random = new Random();
 
	public RxInterval() {
 
		Observable<Integer> observable = Observable
		.interval(5, TimeUnit.SECONDS)
		/**
                 * startWith()
		 * do Observable dodawany jest element startowy, który emitowy jest przed pierwszym głównym, 
		 * a między nimi nie ma zwłoki - efekt jest taki, że pierwszego Integer'a dostajemy od razu, 
		 * a nie po 5 sekundach
		 */
		.startWith(0l) 
		.map(new Func1<Long, Integer>() {
			@Override
			public Integer call(Long t) {
				return random.nextInt();
			}
		});
 
		Observer<Integer> observer = new Observer<Integer>() {
 
			/**
			 * observable skonczyl emisje
			 */
			@Override
			public void onCompleted() {
			}
 
			/**
			 * observable gdzies sie wysypal
			 */
			@Override
			public void onError(Throwable e) {
				// TODO Auto-generated method stub
 
			}
 
			/**
			 * nowy element z observable
			 */
			@Override
			public void onNext(Integer t) {
				System.out.println(t);
			}
		};
 
		observable
                .subscribeOn(Schedulers.immediate())
                .observeOn(Schedulers.immediate())		
		.subscribe(observer);
	}
 
	public static void main(String[] args) {
		new RxInterval();
	}
}
This entry was posted in Java, RxJava. Bookmark the permalink. Post a comment or leave a trackback: Trackback URL.

Post a Comment

Your email is never published nor shared. Required fields are marked *

*
*

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong> <pre lang="" line="" escaped="">