Vapor + Vapor Cloudでクローラー的バッチ処理
この記事はVASILY Advent Calendar 2017の17日目の記事です。
また、以下の記事の続きです。
アプリにJSONを返す部分を安全にする話を前回したので、今度は取引所のAPIを叩いてデータを取得するバッチを紹介する。
データ集めのバッチもVaporの Command の仕組みで動かしている(ここでは詳細は割愛するが、Vapor Cloudはcronもサポートしている)。
今度はバッチ側が取引所APIのクライアントサイドの位置付けになるので、まずは自作フレームワークAbstractionKit を用いて各取引所APIを抽象化し、RxのObservableを経由してデータを取得するクライアント層を作る。
Coincheckの売買価格を取得するAPIをこう定義する。
struct CoincheckEndpoints { struct CurrentRate: EndpointDefinition { typealias Response = SingleResponse<Element> typealias Environment = CoincheckEnvironment struct Element: Himotoki.Decodable, SingleResponseElement { // 略 } var path: String = "/api/exchange/orders/rate" var method: HTTPMethod = .get var parameters: [String: Any] enum Order { case sell case buy var name: String { switch self { case .buy: return "buy" case .sell: return "sell" } } } init(orderType: Order) { parameters = [ "order_type": orderType.name, "pair": "btc_jpy", "amount": 0.005, ] } } }
EndpointDefinition はAbstractionKitが提供するプロトコルの一つで、各エンドポイントのリクエストパラメータ、レスポンスの型、HTTPメソッド、パスなどを定義する。
次いで、 EndpointDefinition のインスタンスを受けて実際にリクエストを投げ、結果を取得できるObservableを返す request メソッドを作る。
static func request<Endpoint: EndpointDefinition>(_ endpoint: Endpoint) -> Single<Endpoint.Response.Result> { return Single.create(subscribe: { (observer) -> Disposable in DispatchQueue.global(qos: .default).async { do { let response: Response let absoluteURLString = Endpoint.environment.url(forPath: endpoint.path).absoluteString let parameters = endpoint.parameters.mapValues({ (value) -> String in String.init(describing: value) }) switch endpoint.method { case .get: response = try httpClient.get(absoluteURLString, query: parameters) case .post: response = try httpClient.post(absoluteURLString, query: parameters) case .delete: response = try httpClient.delete(absoluteURLString, query: parameters) case .patch: response = try httpClient.patch(absoluteURLString, query: parameters) } if response.status.statusCode >= 300 { // Error observer(.error(NetworkError.uncategorized(message: response.status.reasonPhrase))) return } else if let bytes = response.body.bytes { let data = Data.init(bytes: bytes) guard let json = try JSONSerialization.jsonObject(with: data, options: []) as? Endpoint.Response.JSON else { observer(.error(NetworkError.uncategorized(message: "Failed to cast JSON type"))) return } let result = try Endpoint.Response.init(json: json).result observer(.success(result)) } else { observer(.error(NetworkError.uncategorized(message: "Failed to extract bytes"))) return } } catch let error { observer(.error(error)) } } return Disposables.create { } })
必要になったものから実装しているので、HTTPリクエストの機能をすべて網羅していないが気にしないでほしい。
エンドポイントの定義とリクエストメソッドが揃ったので、以下のように通信を起動する事ができるようになった。
swift let endpoint = CoincheckEndpoints.CurrentRate.init(orderType: .buy) APIClient.request(endpoint).subscribe().disposed(by: bag) ExchangeClient というプロトコルを導入し、取引所APIをジェネリックに扱えるようにする。
protocol ExchangeClient { func currentAskRate() -> Single<Ask> func currentBidRate() -> Single<Bid> }
クロール処理自体はRxSwift+RxBlockingで管理している。結局同期的にやるのだが、Rxに乗せておけば順番を入れ替えたり並列処理に変えたりというのが極めて簡単にできる。楽。
public func run(arguments: [String]) throws { let clients: [ExchangeClient] = [ CoincheckClient.init(), BitflyerClient.init(), ] let askRatesObservable = Observable.combineLatest(clients.map { $0.currentAskRate().asObservable().take(1) }) let bidRatesObservable = Observable.combineLatest(clients.map { $0.currentBidRate().asObservable().take(1) }) let asks = try askRatesObservable.toBlocking().first() // ここで同期的に通信が完了するのを待っている let bids = try bidRatesObservable.toBlocking().first() try asks?.forEach { try $0.makeQuery(driver).save() } try bids?.forEach { try $0.makeQuery(driver).save() } }
Vapor Cloudの一番安いDBインスタンスは、直接SQLクライアントアプリ等からつなぐことが出来ず、phpMyAdminが提供されるだけなのがちょっとつらいが、まあ安いので目をつぶろう、という感じです。
簡単にだが、クローラーバッチの紹介を以上とする。
次は23日に
- SwaggerからSwiftのコードを生成するPython製ジェネレータ
の記事を書くから期待してほしい。