こんにちは、エンジニアの渡辺(@mochi_neko_7)です。
今回は Rust における Stream の基本的な取り回しを紹介します。
Anthropic の Claude API の Rust Client を書いていた際に Streaming API の対応をしていたところ、自身が Rust の Stream をちゃんと理解していなかったことに気づき勉強し直していました。
ところが日本語で解説している記事をあまり見かけなかったため、本記事で自分の理解している範囲で基礎的な内容を整理して紹介したいと思います。
もし間違った記述等ありましたらご指摘いただけますと幸いです。
環境
- Rust:
v1.76.0
- futures_core:
v0.3.30
- async_stream:
v0.3.5
- futures_util:
v0.3.30
- tokio_stream:
v0.1.15
Rust の Stream
そもそも Rust の Stream はどんなものか、という基礎の解説は英語ですが下記にあります。
こちらを参考にしていただきつつ、ここでは次の二点を押さえておきましょう。
- 非同期版 Iterator としての Stream
- Poll の扱い
1. 非同期版 Iterator としての Stream
Rust の Iterator は下記のような trait で定義されています。
pub trait Iterator { type Item; fn next(&mut self) -> Option<Self::Item>; }
Option<Self::Item>
の戻り値は値を返せる場合には Some(Item)
を、 Iterator の終端に達して値を返せない場合には None
を返すものです。
他言語だと boolean と nullable の組み合わせでこれらを表現したりしますが、Rust では Option でシンプルに表現できます。
一方、Stream の trait の定義を見てみると、非同期システムの都合で見かけが変わっている点はあるものの基本的には同じ形式をしていることがわかります。
pub trait Stream { type Item; fn poll_next( self: Pin<&mut Self>, cx: &mut Context ) -> Poll<Option<Self::Item>>; }
Stream in futures_core::stream - Rust
本題にあまり関係ない size_hint
は省略しています。
実際、futures_util の StreamExt を利用すると
use futures_util::StreamExt; while let Some(item) = stream.next().await { // item の処理 }
のように Iterator に .await
を付けたような格好で呼び出すことができます。
2. Poll の扱い
Poll は下記で定義されています。
pub enum Poll<T> { Ready(T), Pending, }
状態として二種類あります。
Poll::Ready(T)
: 値T
を返す準備ができている状態Poll::Pending
: 値を返す準備ができていない状態
Stream の場合は T
に Option<Self::Item>
を指定するので、状態は次の三種類を取ります。
Poll::Ready(Some(T))
: 値T
を返す準備ができている状態Poll::Ready(None)
: Stream の終端に達した状態Poll::Pending
: 値を返す準備ができていない状態
Stream を読むだけの場合には StreamExt
の next().await
を呼ぶかと思いますので、Stream trait を自分で実装する場合以外は Poll の細かい取り扱いは気にしなくて済みます。
Stream の周辺 crate
Stream は Rust のコアには最低限の trait しか用意されておらず、さまざまな便利機能は現在では別 crate で提供されています。
1. futures_core
Stream の定義は futures_core
crate から提供されています。
他 crate もこの Stream を利用していますし Stream を Re-export してくれている場合も多いので、以下の拡張 crate を利用する際には不要な場合もあります。
2. async_stream
async_stream
は標準では提供されていない yield
構文による Stream の生成を、stream!
マクロによってサポートしています。
use async_stream::stream; use futures_core::stream::Stream; fn zero_to_three() -> impl Stream<Item = u32> { stream! { for i in 0..3 { yield i; } } }
ちなみに Iterator から Stream を生成したい場合には以下で紹介する crate の futures_util::stream::iter()
や tokio_stream::iter()
などを利用できます。
3. futures_util
futures_util
は Stream を含む Rust の非同期 API の拡張機能を提供します。
将来的に安定した機能は futures_core
に統合されるものらしいです。
特に重要なのが StreamExt
で、非同期 API .await
で呼び出せる StreamExt::next()
や StreamExt::map()
、StreamExt::filter()
などの Iterator と同様の Adapter 系機能などを提供します。
StreamExt in futures_util::stream - Rust
先にも使用例を書きましたが、futures_util::StreamExt
の利用イメージは下記になります。
use futures_util::StreamExt; async fn some_function() { let mut stream = futures_util::stream::iter(vec![0, 1, 2]); while let Some(value) = stream.next().await { println!("Got {}", value); } }
ただし、futures_util::StreamExt::next()
は非同期ではありますがシングルスレッドで動作するため、マルチスレッドを利用したい場合には tokio-stream
などマルチスレッドに対応している非同期バックエンドによる実装を利用した方が良い場合もありますので、ユースケースに合わせて選択してください。
4. tokio-stream
tokio の非同期バックエンドと連携した運用をする際には、tokio-stream
で提供されている拡張機能を利用することもできます。
futures_util::StreamExt
の代わりに tokio_stream::StreamExt
を使用するイメージです。
use tokio_stream::StreamExt; async fn some_function() { let mut stream = tokio_stream::stream::iter(vec![0, 1, 2]); while let Some(value) = stream.next().await { println!("Got {}", value); } }
ただし、現行バージョンでは poll_next_unpin
など futures_util::StreamExt
ではサポートしていても tokio_stream::StreamExt
ではサポートされていない機能などもありますので注意が必要です。
使い分け
futures_util
と tokio_stream
はどちらも StreamExt
を提供していますが、非同期バックエンドが異なるためユースケースに応じて使い分けが必要です。
futures_util
: 標準の非同期 API (futures_core
) バックエンド、シングルスレッドtokio_stream
:tokio
バックエンド、マルチスレッド対応
他の非同期バックエンドを利用する際には対応するものを探してみてください。
Stream の変換
Stream の変換は StreamExt の Adapter で済む場合もありますが、バッファリングをして最適化した場合など自分で Stream の変換処理を書いた方がいいケースもあります。
Stream の取り回しの確認も兼ねて、例として reqwest、つまり HTTP のレスポンスを Byte の Stream として受け取り、行ごとのテキストに分割して Stream として提供することを考えてみましょう。
この場合、Byte 配列をバッファリングしつつ改行コードを見つけたら Chunk を返す、という Stream の実装をします。
use std::pin::Pin; use std::task::{Context, Poll}; use bytes::{Buf, BytesMut}; use futures_core::Stream; use pin_project::pin_project; // Stream の処理のエラーの定義 (thiserrorを利用) #[derive(Debug, thiserror::Error)] enum LineStreamError { #[error("Failed to read reqwest stream: {0:?}")] ReqwestError(#[from] reqwest::Error), #[error("Failed to decode chunk to UTF-8 string: {0:?}")] StringDecodingError(#[from] std::string::FromUtf8Error), } // Stream の Item の定義 type LineStreamResult = Result<String, LineStreamError>; // 自作の行ごとのテキストの Stream の定義 #[pin_project] struct LineStream<S> where S: Stream<Item = ReqwestStreamItem> + Unpin, { #[pin] stream: S, // 元の Stream buffer: BytesMut, // bytes の可変長バッファ } // reqwest の Response Stream の Item type ReqwestStreamItem = Result<bytes::Bytes, reqwest::Error>; // 初期化メソッドの定義 impl<S> LineStream<S> where S: Stream<Item = ReqwestStreamItem> + Unpin, { fn new(stream: S) -> Self { LineStream { stream, buffer: BytesMut::new(), } } } // 目的の Stream trait の実装 impl<S> Stream for LineStream<S> where S: Stream<Item = ReqwestStreamItem> + Unpin, { type Item = LineStreamResult; // 最終的に欲しい Item の型をここで指定する fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<LineStreamResult>> { let mut this = selft.project(); // Unpin loop { // バッファに改行を含む場合 if let Some(position) = this .buffer .iter() .position(|b| *b == b'\n') { // position より手前の bytesを取り出す let line = this.buffer.split_to(position); // position にある改行はスキップする this.buffer.advance(1); // bytes を UTF-8 String にデコードする let line = String::from_utf8(line.to_vec()) .map_err(StreamLineError::StringDecodingError)?; // デコードした文字列を返す return Poll::Ready(Some(Ok(line))); } // 内部の Stream の読み込み match this .as_mut() .stream .poll_next(cx) { // bytes を受け取った場合 | Poll::Ready(Some(Ok(chunk))) => { // バッファに bytes を格納する this.buffer.extend(&chunk); // 手前の改行を取り出す処理に入るまで loop を継続する }, // Error が返ってきた場合 | Poll::Ready(Some(Err(error))) => { return Poll::Ready(Some(Err( StreamLineError::ReqwestError(error), ))); }, // Stream の終端に達した場合 | Poll::Ready(None) => { // バッファが空の場合は何もしない return if this.buffer.is_empty() { Poll::Ready(None) // バッファにデータが残っている場合は処理を試みる } else { let line = this.buffer.split_off(0); let line = String::from_utf8(line.to_vec()) .map_err(StreamLineError::StringDecodingError)?; Poll::Ready(Some(Ok(line))) }; }, // 待機中の場合はそのまま | Poll::Pending => return Poll::Pending, } } } }
利用している crate は下記になります。
futures_core
reqwest
bytes
pin_project
thiserror
まず、#[pin_project]
、#[pin]
、let mut this = selft.project();
などは元の Stream を Unpin するためのものです。
futures_util::StreamExt
には StreamExt::poll_next_unpin
という便利メソッドがあるのでそちらを使うと楽ですが、使わない場合には pin_project
crate を使って Unpin を実装できます。
StreamExt in futures_util::stream - Rust
今回の変換処理の主題はテキストの bytes を読み込んでバッファに溜めつつ、改行を見つけた場合に行分のテキストを取り出したいものでした。
元の Stream を読み込んでバッファに溜めていく処理は
// 内部の Stream の読み込み match this .as_mut() .stream .poll_next(cx) { ... }
のブロックで実装しています。
そして、
// バッファに改行を含む場合 if let Some(position) = this .buffer .iter() .position(|b| *b == b'\n') { ... }
の処理で改行コード \n
を探索し、発見した場合にはそれをテキストに変換して結果を返します。
実際に使用するイメージは下記になります。
use tokio_stream::StreamExt; use xxx::LineStream; #[tokio::main] async fn main() -> anyhow::Result<()> { let stream = reqwest::get("http://httpbin.org/ip") .await? .bytes_stream(); let mut line_stream = LineStream::new(stream); if let Some(line) = line_stream.next().await { println!("Line: {:?}", line); } Ok(()) }
ライブラリで Stream を利用する際に気をつけたいこと
アプリケーションとして Stream を扱う場合にはアプリケーション全体で利用する非同期バックエンドに準拠して実装をすれば良いですが、ライブラリとして Stream を扱う場合にはライブラリ利用者がどの非同期バックエンドを使うのか想定することはできません。
ライブラリ内部で特定の非同期バックエンドに依存した処理を他でしていない場合には、使用する crate を適切に選択肢、なるべく依存関係を少なく(つまりバイナリのサイズを小さく)したいでしょう。
その場合、futures_core
crate のみで実装し、futuers_util
、tokio_stream
の具体の拡張を利用しないのも選択肢の一つです。
具体例として自分が開発している Claude API の Rust Client
では Streaming モードの API もサポートしていますが、API としては
pub async fn create_a_message_stream( &self, request_body: MessagesRequestBody ) -> MessagesResult<impl futures_core::stream::Stream<Item = ChunkStreamResult>>
のように futures_core
のみ使用し、README や examples では futures_util
と tokio_stream
の両方を使ったサンプルを提供しています。
とはいえ futures_util
に関しては将来的にはコアに含められる可能性もあるため、どこまで利用すべきかはプロジェクト次第かと思います。
まとめ
以上で紹介した内容を端的にまとめると下記になります。
- Stream は非同期版の Iterator
next()
やmap()
、filter()
などの拡張機能は、futures_util
やtokio_stream
のStreamExt
などで提供されている- ユースケースに応じてコア crate(
futures_core
)、拡張 crate (async_stream
、futures_util
、tokio_stream
など)を使い分ける - Stream の変換処理は Adapter だと難しい(あるいは効率が悪い)場合でも、 Stream trait を自分で実装すれば可能
- Stream を利用したライブラリを作成する場合には
futures_core
のみで実装するのも選択肢の一つ
おわりに
自分で実際に Stream を利用したライブラリの開発を通じて理解した内容を基に、Stream の基本的な取り回しを紹介しました。
もっと深掘りをすれば TryStream や cx: &mut Context<'_>
って何?といった話もありそうですが、今回はもっともシンプルに Stream を扱う際に必要になる話に限定しました。
Rust の Stream は高度に抽象化されているのもあり、ちゃんと理解した後だとかなり扱いやすいなと思いました。
Rust の入門ドキュメントには Stream の解説がなかった気がしますが、似たような仕組みの Channel より扱いやすいので使えて損はないでしょう。
Claude の API の Rust Client の Streaming API の実装も参考にしていたければと思います。
Rust で何かの crate の API で提供される Stream を利用したい、その Stream を扱いたい形式に変換したい、あるいは Stream を使った API を含むライブラリを開発したい方に参考になれば幸いです。