FS2

Functional programming-based streaming library. Provides purely functional and resource-safe stream processing. Integration with Cats Effect manages side effects at type level, enabling composable stream processing.

Scalafunctional programmingstreamingpurely functionalresource safeCats Effectcomposable

Framework

FS2

Overview

FS2 is a functional programming-based streaming library. It provides purely functional and resource-safe stream processing, with integration with Cats Effect to manage side effects at the type level, enabling composable stream processing.

Details

FS2 (Functional Streams for Scala) is a functional streaming library developed as part of the Typelevel ecosystem. Development began in 2016 and is designed based on purely functional programming principles. Its key feature is managing all side effects at the type level, significantly reducing runtime errors and enabling composable stream processing. Deep integration with Cats Effect provides safe resource management and exception handling using IO monads. From stream construction to execution, everything remains immutable and referentially transparent. It also includes practical features like backpressure control and automatic resource cleanup through finalizers. It integrates seamlessly with other Typelevel libraries like Http4s, Doobie, and Circe, enabling full-scale application development in functional programming style.

Pros and Cons

Pros

  • Type Safety: Manages side effects at type level with compile-time error detection
  • Composability: Build complex processing by combining small stream components
  • Resource Safety: Automatic resource management and cleanup
  • Referential Transparency: Predictable behavior through purely functional approach
  • Rich Combinators: Abundant stream operation functions
  • Cats Effect Benefits: Powerful concurrency and error handling

Cons

  • Learning Curve: Requires deep understanding of functional programming
  • Performance: Overhead from purely functional approach
  • Error Messages: Type error messages can become complex
  • Ecosystem: Limited integration with mainstream Scala libraries

Main Use Cases

  • Data processing in functional applications
  • File I/O streaming
  • HTTP/WebSocket streaming
  • Database streaming
  • Concurrent processing pipelines
  • Real-time data transformation
  • ETL processing

Basic Usage

Adding Dependencies

libraryDependencies ++= Seq(
  "co.fs2" %% "fs2-core" % "3.9.0",
  "co.fs2" %% "fs2-io" % "3.9.0",
  "org.typelevel" %% "cats-effect" % "3.5.0"
)

Basic Stream Processing

import fs2.Stream
import cats.effect.{IO, IOApp}

object BasicStreamExample extends IOApp.Simple {
  def run: IO[Unit] = {
    // Basic stream
    val numbers = Stream.range(1, 11)
    
    // Stream transformation and execution
    val program = numbers
      .map(_ * 2)
      .filter(_ % 4 == 0)
      .evalMap(n => IO.println(s"Processing: $n"))
      .compile
      .drain
    
    program
  }
}

IO Integration

import fs2.Stream
import cats.effect.{IO, IOApp}
import scala.concurrent.duration._

object IOIntegrationExample extends IOApp.Simple {
  def run: IO[Unit] = {
    // IO-based stream processing
    val ioStream = Stream
      .repeatEval(IO.println("Tick").as(1))
      .metered(1.second)
      .take(5)
      .compile
      .drain
    
    // Parallel processing
    val parallelStream = Stream
      .range(1, 21)
      .evalMap { n =>
        IO.sleep(100.millis) >> IO.println(s"Processed: $n")
      }
      .parEvalMap(4)(_ => IO.unit)  // 4 concurrent processes
      .compile
      .drain
    
    ioStream >> parallelStream
  }
}

File Processing

import fs2.{Stream, text}
import fs2.io.file.{Files, Path}
import cats.effect.{IO, IOApp}

object FileProcessingExample extends IOApp.Simple {
  def run: IO[Unit] = {
    val inputPath = Path("input.txt")
    val outputPath = Path("output.txt")
    
    // File reading, processing, and writing
    val fileProcessing = Files[IO]
      .readAll(inputPath)
      .through(text.utf8.decode)
      .through(text.lines)
      .filter(_.nonEmpty)
      .map(_.toUpperCase)
      .intersperse("\n")
      .through(text.utf8.encode)
      .through(Files[IO].writeAll(outputPath))
      .compile
      .drain
    
    fileProcessing
  }
}

Resource Management

import fs2.Stream
import cats.effect.{IO, IOApp, Resource}
import java.io.{BufferedReader, FileReader}

object ResourceManagementExample extends IOApp.Simple {
  def run: IO[Unit] = {
    // Safe resource management
    val fileResource = Resource.make(
      IO(new BufferedReader(new FileReader("data.txt")))
    )(reader => IO(reader.close()))
    
    // Resource usage in stream
    val streamWithResource = Stream
      .resource(fileResource)
      .flatMap { reader =>
        Stream
          .repeatEval(IO(Option(reader.readLine())))
          .unNoneTerminate
      }
      .evalMap(line => IO.println(s"Read: $line"))
      .compile
      .drain
    
    streamWithResource
  }
}

Error Handling

import fs2.Stream
import cats.effect.{IO, IOApp}

object ErrorHandlingExample extends IOApp.Simple {
  def run: IO[Unit] = {
    // Stream that may cause errors
    val riskyStream = Stream
      .range(1, 11)
      .evalMap { n =>
        if (n == 5) IO.raiseError(new RuntimeException("Error at 5"))
        else IO.println(s"Processing: $n")
      }
    
    // Error handling
    val recoveredStream = riskyStream
      .handleErrorWith { error =>
        Stream.eval(IO.println(s"Error occurred: ${error.getMessage}"))
      }
      .compile
      .drain
    
    // Retry
    val retryStream = riskyStream
      .retry(3)
      .compile
      .drain
    
    recoveredStream >> retryStream
  }
}

Concurrent Processing

import fs2.Stream
import cats.effect.{IO, IOApp}
import scala.concurrent.duration._

object ConcurrentProcessingExample extends IOApp.Simple {
  def run: IO[Unit] = {
    // Concurrent stream processing
    val producer = Stream
      .repeatEval(IO.println("Producing item"))
      .metered(500.millis)
      .take(10)
    
    val consumer = Stream
      .repeatEval(IO.println("Consuming item"))
      .metered(1.second)
      .take(5)
    
    // Concurrent execution
    val concurrentStreams = Stream(producer, consumer)
      .parJoin(2)
      .compile
      .drain
    
    concurrentStreams
  }
}

HTTP Client (Http4s Integration)

import fs2.Stream
import cats.effect.{IO, IOApp}
import org.http4s.client.{Client, JavaNetClientBuilder}
import org.http4s.{Request, Uri}

object HttpClientExample extends IOApp.Simple {
  def run: IO[Unit] = {
    JavaNetClientBuilder[IO].create.use { client =>
      val urls = List(
        "https://api.example.com/data/1",
        "https://api.example.com/data/2",
        "https://api.example.com/data/3"
      )
      
      // Concurrent HTTP requests
      val httpRequests = Stream
        .emits(urls)
        .evalMap(url => IO.fromEither(Uri.fromString(url)))
        .map(uri => Request[IO](uri = uri))
        .parEvalMap(3) { request =>
          client.expect[String](request)
            .handleErrorWith(error => IO(s"Error: ${error.getMessage}"))
        }
        .evalMap(response => IO.println(s"Response: $response"))
        .compile
        .drain
      
      httpRequests
    }
  }
}

Database Processing (Doobie Integration)

import fs2.Stream
import cats.effect.{IO, IOApp}
import doobie._
import doobie.implicits._

object DatabaseExample extends IOApp.Simple {
  case class User(id: Int, name: String, email: String)
  
  // Database connection
  val transactor = Transactor.fromDriverManager[IO](
    "org.postgresql.Driver",
    "jdbc:postgresql://localhost/mydb",
    "user",
    "password"
  )
  
  def run: IO[Unit] = {
    // Streaming query
    val usersStream = sql"SELECT id, name, email FROM users"
      .query[User]
      .stream
      .transact(transactor)
    
    // Data processing
    val processedUsers = usersStream
      .filter(_.name.startsWith("J"))
      .map(user => user.copy(email = user.email.toLowerCase))
      .evalMap(user => IO.println(s"User: $user"))
      .compile
      .drain
    
    processedUsers
  }
}

Custom Stream Transformation

import fs2.{Pipe, Stream}
import cats.effect.{IO, IOApp}

object CustomTransformationExample extends IOApp.Simple {
  // Custom pipe definition
  def batchPipe[A](batchSize: Int): Pipe[IO, A, List[A]] = {
    _.chunkN(batchSize).map(_.toList)
  }
  
  def logPipe[A]: Pipe[IO, A, A] = {
    _.evalMap { a =>
      IO.println(s"Processing: $a").as(a)
    }
  }
  
  def run: IO[Unit] = {
    val program = Stream
      .range(1, 21)
      .through(logPipe)
      .through(batchPipe(3))
      .evalMap(batch => IO.println(s"Batch: $batch"))
      .compile
      .drain
    
    program
  }
}

Latest Trends (2025)

  • FS2 3.9 Series: Performance improvements and Cats Effect 3.5 support
  • Scala 3 Support: Integration with latest Scala features
  • gRPC Support: Streaming gRPC services
  • Observability: OpenTelemetry integration
  • WebAssembly: Browser execution via Scala.js

Summary

FS2 is established as the standard choice for streaming processing in functional programming in 2025. Through type safety, composability, and resource safety, it enables building robust and maintainable streaming applications. Integration with the Typelevel ecosystem enables full-scale functional application development.