【RxJS】ユースケースから理解するRxJS
ユースケース
Observableを作りたい
-
new Observableで値を流していく
import { Component, OnInit } from '@angular/core';
import { Observable } from 'rxjs';
@Component({
selector: 'app-rxjs',
templateUrl: './rxjs.component.html',
styleUrls: ['./rxjs.component.scss'],
})
export class RxjsComponent implements OnInit {
constructor() {}
ngOnInit(): void {}
/**
* 注文処理
*/
order() {
const menu = new Observable((observable) => {
try {
observable.next('お冷');
setTimeout(() => {
observable.next('チャーハン');
observable.next('醤油拉麺');
setTimeout(() => {
observable.complete();
}, 5000);
}, 3000);
} catch (e) {
observable.error(e);
}
});
// 時間測定用
let time = 0;
const timer = setInterval(() => {
time++;
}, 1000);
menu.subscribe(
// 値が流れてきた時の処理
(result) => {
console.log(`[${time}秒] 提供:${result}`);
},
// エラーが発生した時の処理
(error) => {
console.error(error);
},
// 処理が完了した時の処理
() => {
console.log(`[${time}秒] お会計`);
clearInterval(timer);
}
);
}
}
-
実行結果
PromiseをObservableに変換したい
-
AmplifyとかのライブラリはPromiseで実装されているのでObservableに統一したい
-
fromを使う
import { Component, OnInit } from '@angular/core';
import { from } from 'rxjs';
import { Storage } from 'aws-amplify';
@Component({
selector: 'app-rxjs',
templateUrl: './rxjs.component.html',
styleUrls: ['./rxjs.component.scss'],
})
export class RxjsComponent implements OnInit {
constructor() {}
ngOnInit(): void {}
/**
* 注文処理
*/
orderRamen() {
from(Storage.list('', { level: 'protected' })).subscribe((result) => {
console.log(result);
});
}
}
複数の非同期通信の結果を待ち合わせて取得する
-
ZIPを使う
-
それぞれの非同期通信の実行結果を取得してログに出力するだけ
import { Component, OnInit } from '@angular/core';
import { from, zip } from 'rxjs';
import { map } from 'rxjs/operators';
@Component({
selector: 'app-rxjs',
templateUrl: './rxjs.component.html',
styleUrls: ['./rxjs.component.scss'],
})
export class RxjsComponent implements OnInit {
constructor() {}
ngOnInit(): void {}
/**
* 注文処理
*/
orderRamen() {
// 注文を受けた順に格納(1人目「太麺、醤油」 2人目「細麺、塩」3人目「たまご麺、味噌」)
let menOrder = from(['太麺', '細麺', 'たまご麺']);
let soupOrder = from(['醤油', '塩', '味噌']);
zip(menOrder, soupOrder)
.pipe(
map(([men, soup]) => {
// 順番に結果を取得して提供
console.log(`[麺]:${men}, [スープ]:${soup}`);
})
)
.subscribe();
}
}
-
実行結果
-
observableにそれぞれ値が流れてきてから処理を実施している
-
非同期通信でエラーが発生した場合
import { Component, OnInit } from '@angular/core';
import { from, throwError, zip } from 'rxjs';
import { map } from 'rxjs/operators';
@Component({
selector: 'app-rxjs',
templateUrl: './rxjs.component.html',
styleUrls: ['./rxjs.component.scss'],
})
export class RxjsComponent implements OnInit {
constructor() {}
ngOnInit(): void {}
/**
* 注文処理
*/
orderRamen() {
// 注文を受けた順に格納(1人目「太麺、醤油」 2人目「細麺、塩」3人目「たまご麺、味噌」)
let menOrder = from(['太麺', '細麺', 'たまご麺']);
let soupOrder = from(['醤油', '塩', '味噌']);
let error = throwError("落としたException");
zip(menOrder, soupOrder, error)
.pipe(
map(([men, soup]) => {
// 順番に結果を取得して提供
console.log(`[麺]:${men}, [スープ]:${soup}`);
})
)
.subscribe(
(result)=>{},
(error)=>{
console.log(error);
}
);
}
}
-
実行結果
-
subscribe側でエラーを検知
-
subscribe側に値を渡したい
-
map内でreturnを使えばobservable.nextに値が設定される
import { Component, OnInit } from '@angular/core';
import { from, zip } from 'rxjs';
import { map } from 'rxjs/operators';
@Component({
selector: 'app-rxjs',
templateUrl: './rxjs.component.html',
styleUrls: ['./rxjs.component.scss'],
})
export class RxjsComponent implements OnInit {
constructor() {}
ngOnInit(): void {}
/**
* 注文処理
*/
orderRamen() {
// 注文を受けた順に格納(1人目「太麺、醤油」 2人目「細麺、塩」3人目「たまご麺、味噌」)
let menOrder = from(['太麺', '細麺', 'たまご麺']);
let soupOrder = from(['醤油', '塩', '味噌']);
zip(menOrder, soupOrder)
.pipe(
map(([menSozai, soupSozai]) => {
// 順番に結果を取得して提供
return { men: menSozai, soup: soupSozai };
})
)
.subscribe(
(result) => {
console.log(result);
},
(error) => {
console.log(error);
}
);
}
}
-
実行結果
共通化して処理の切り出し
import { Component, OnInit } from '@angular/core';
import { from, Observable, zip } from 'rxjs';
import { map } from 'rxjs/operators';
@Component({
selector: 'app-rxjs',
templateUrl: './rxjs.component.html',
styleUrls: ['./rxjs.component.scss'],
})
export class RxjsComponent implements OnInit {
constructor() {}
ngOnInit(): void {}
/**
* 注文処理
*/
orderRamen() {
// 注文を受けた順に格納(1人目「太麺、醤油」 2人目「細麺、塩」3人目「たまご麺、味噌」)
let menOrder = from(['太麺', '細麺', 'たまご麺']);
let soupOrder = from(['醤油', '塩', '味噌']);
this.createRamen(menOrder, soupOrder).subscribe((result) => {
console.log(`[麺]:${result.men}, [スープ]:${result.soup}`);
});
}
/**
* 作成処理
* @param menOrder 麺の注文
* @param soupOrder スープの注文
*/
private createRamen(
menOrder: Observable<string>,
soupOrder: Observable<string>
): Observable<any> {
return zip(menOrder, soupOrder).pipe(
map(([menSozai, soupSozai]) => {
// 順番に結果を取得して提供
return { men: menSozai, soup: soupSozai };
})
);
}
}
非同期処理の実行結果を使って非同期処理を実行する
-
以下のオペレーターを仕様によって使い分ける
-
concatMap
-
値が流れてきた順番で実行
-
-
mergeMap
-
準備ができたものから随時実行
-
-
switchMap
-
流れてきた値の処理中に再度値が流れてきたら前の処理はキャンセル
-
-
例)Cognitoのトークンを取得してAPI Gatewayに通信する
import { Injectable } from '@angular/core';
import { Observable } from 'rxjs';
import { switchMap } from 'rxjs/operators';
import { RestGetInput } from '../aws/api/data/rest-get-input';
import { RestService } from '../aws/api/rest.service';
import { CognitoService } from '../aws/auth/cognito.service';
@Injectable({
providedIn: 'root',
})
export class OrderService {
private API_NAME = 'RAMEN';
private API_PATH = '/order';
constructor(
private restService: RestService,
private cognitoService: CognitoService
) {}
/**
* 注文情報取得
* @param orderId
*/
public getOrder(orderId: string): Observable<IOrderInfo> {
let restGetInput = new RestGetInput();
restGetInput.apiName = this.API_NAME;
restGetInput.path = this.API_PATH;
restGetInput.init = { queryStringParameters: orderId };
return new Observable((observer) => {
// Token取得
this.cognitoService
.getAuthToken()
.pipe(
switchMap((token) => {
restGetInput.token = token;
// GET通信
return this.restService.get(restGetInput);
})
)
// 通信処理の確認
.subscribe(
(result) => {
// ステータスコードチェック
const statusCode = result.statusCode;
if (statusCode !== 200) {
// エラー通知
observer.error('取得処理に失敗');
return;
}
const respnse = result.response;
// 処理結果を通知
observer.next({
orderId: respnse.orderId,
order: respnse.order,
});
},
(error) => {
// エラー通知
observer.error('取得処理に失敗');
}
);
});
}
}
/**
* 注文情報
*/
interface IOrderInfo {
orderId: string;
order: Array<string>;
}