package rxjava;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import rx.Observable;
import rx.Observer;
import rx.functions.Func1;
import rx.schedulers.Schedulers;
public class RxInterval {
private final Random random = new Random();
public RxInterval() {
Observable<Integer> observable = Observable
.interval(5, TimeUnit.SECONDS)
/**
* startWith()
* do Observable dodawany jest element startowy, który emitowy jest przed pierwszym głównym,
* a między nimi nie ma zwłoki - efekt jest taki, że pierwszego Integer'a dostajemy od razu,
* a nie po 5 sekundach
*/
.startWith(0l)
.map(new Func1<Long, Integer>() {
@Override
public Integer call(Long t) {
return random.nextInt();
}
});
Observer<Integer> observer = new Observer<Integer>() {
/**
* observable skonczyl emisje
*/
@Override
public void onCompleted() {
}
/**
* observable gdzies sie wysypal
*/
@Override
public void onError(Throwable e) {
// TODO Auto-generated method stub
}
/**
* nowy element z observable
*/
@Override
public void onNext(Integer t) {
System.out.println(t);
}
};
observable
.subscribeOn(Schedulers.immediate())
.observeOn(Schedulers.immediate())
.subscribe(observer);
}
public static void main(String[] args) {
new RxInterval();
}
} |
package rxjava;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import rx.Observable;
import rx.Observer;
import rx.functions.Func1;
import rx.schedulers.Schedulers;
public class RxInterval {
private final Random random = new Random();
public RxInterval() {
Observable<Integer> observable = Observable
.interval(5, TimeUnit.SECONDS)
/**
* startWith()
* do Observable dodawany jest element startowy, który emitowy jest przed pierwszym głównym,
* a między nimi nie ma zwłoki - efekt jest taki, że pierwszego Integer'a dostajemy od razu,
* a nie po 5 sekundach
*/
.startWith(0l)
.map(new Func1<Long, Integer>() {
@Override
public Integer call(Long t) {
return random.nextInt();
}
});
Observer<Integer> observer = new Observer<Integer>() {
/**
* observable skonczyl emisje
*/
@Override
public void onCompleted() {
}
/**
* observable gdzies sie wysypal
*/
@Override
public void onError(Throwable e) {
// TODO Auto-generated method stub
}
/**
* nowy element z observable
*/
@Override
public void onNext(Integer t) {
System.out.println(t);
}
};
observable
.subscribeOn(Schedulers.immediate())
.observeOn(Schedulers.immediate())
.subscribe(observer);
}
public static void main(String[] args) {
new RxInterval();
}
}