Skip to content

Mention Unpin requirement in Stream::by_ref() #2995

@svix-jbrown

Description

@svix-jbrown

Maybe this is just me being silly, but I found the following behavior very surprising:

Given this code:

use futures::stream::{self, Stream, StreamExt};

async fn demo<S: Stream<Item = u32>>(mut stream: S) {
    let sum = stream
        .by_ref()
        .take(2)
        .fold(0, |a, b| async move { a + b })
        .await;
    assert_eq!(sum, 3);
}

fn main() {
    futures::executor::block_on(demo(stream::iter(1..5)));
}

(which is basically the example from the documentation, but with the stream moved to a parameter instead of a literal)

You get this totally unhelpful compile failure:

$ cargo run
   Compiling stream_ref_test v0.1.0 (/Users/jbrown/tmp/2026-02-27/stream_ref_test)
error[E0507]: cannot move out of a mutable reference
    --> src/main.rs:4:15
     |
   4 |       let sum = stream
     |  _______________^
   5 | |         .by_ref()
     | |_________________^ move occurs because value has type `S`, which does not implement the `Copy` trait
   6 |           .take(2)
     |            ------- value moved due to this method call
     |
note: `futures::StreamExt::take` takes ownership of the receiver `self`, which moves value
    --> /Users/jbrown/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/futures-util-0.3.32/src/stream/stream/mod.rs:1176:13
     |
1176 |     fn take(self, n: usize) -> Take<Self>
     |             ^^^^
help: if `S` implemented `Clone`, you could clone the value
    --> src/main.rs:3:15
     |
   3 |   async fn demo<S: Stream<Item = u32>>(mut stream: S) {
     |                 ^ consider constraining this type parameter with `Clone`
   4 |       let sum = stream
     |  _______________-
   5 | |         .by_ref()
     | |_________________- you could clone this value

For more information about this error, try `rustc --explain E0507`.
error: could not compile `stream_ref_test` (bin "stream_ref_test") due to 1 previous error

That's because the impl for Stream on &mut S has an Unpin bound, so in order to use as_ref() with a stream, the stream has to be Unpin.

use futures::stream::{self, Stream, StreamExt};

async fn demo<S: Stream<Item = u32> + Unpin>(mut stream: S) {
    let sum = stream
        .by_ref()
        .take(2)
        .fold(0, |a, b| async move { a + b })
        .await;
    assert_eq!(sum, 3);
}

fn main() {
    futures::executor::block_on(demo(stream::iter(1..5)));
}

I recognize that the book talks about Unpin a bit, but it might also be worth throwing into the docs for StreamExt::by_ref()? What do y'all think?

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions