FS2
Functional programming-based streaming library. Provides purely functional and resource-safe stream processing. Integration with Cats Effect manages side effects at type level, enabling composable stream processing.
Framework
FS2
Overview
FS2 is a functional programming-based streaming library. It provides purely functional and resource-safe stream processing, with integration with Cats Effect to manage side effects at the type level, enabling composable stream processing.
Details
FS2 (Functional Streams for Scala) is a functional streaming library developed as part of the Typelevel ecosystem. Development began in 2016 and is designed based on purely functional programming principles. Its key feature is managing all side effects at the type level, significantly reducing runtime errors and enabling composable stream processing. Deep integration with Cats Effect provides safe resource management and exception handling using IO monads. From stream construction to execution, everything remains immutable and referentially transparent. It also includes practical features like backpressure control and automatic resource cleanup through finalizers. It integrates seamlessly with other Typelevel libraries like Http4s, Doobie, and Circe, enabling full-scale application development in functional programming style.
Pros and Cons
Pros
- Type Safety: Manages side effects at type level with compile-time error detection
- Composability: Build complex processing by combining small stream components
- Resource Safety: Automatic resource management and cleanup
- Referential Transparency: Predictable behavior through purely functional approach
- Rich Combinators: Abundant stream operation functions
- Cats Effect Benefits: Powerful concurrency and error handling
Cons
- Learning Curve: Requires deep understanding of functional programming
- Performance: Overhead from purely functional approach
- Error Messages: Type error messages can become complex
- Ecosystem: Limited integration with mainstream Scala libraries
Main Use Cases
- Data processing in functional applications
- File I/O streaming
- HTTP/WebSocket streaming
- Database streaming
- Concurrent processing pipelines
- Real-time data transformation
- ETL processing
Basic Usage
Adding Dependencies
libraryDependencies ++= Seq(
"co.fs2" %% "fs2-core" % "3.9.0",
"co.fs2" %% "fs2-io" % "3.9.0",
"org.typelevel" %% "cats-effect" % "3.5.0"
)
Basic Stream Processing
import fs2.Stream
import cats.effect.{IO, IOApp}
object BasicStreamExample extends IOApp.Simple {
def run: IO[Unit] = {
// Basic stream
val numbers = Stream.range(1, 11)
// Stream transformation and execution
val program = numbers
.map(_ * 2)
.filter(_ % 4 == 0)
.evalMap(n => IO.println(s"Processing: $n"))
.compile
.drain
program
}
}
IO Integration
import fs2.Stream
import cats.effect.{IO, IOApp}
import scala.concurrent.duration._
object IOIntegrationExample extends IOApp.Simple {
def run: IO[Unit] = {
// IO-based stream processing
val ioStream = Stream
.repeatEval(IO.println("Tick").as(1))
.metered(1.second)
.take(5)
.compile
.drain
// Parallel processing
val parallelStream = Stream
.range(1, 21)
.evalMap { n =>
IO.sleep(100.millis) >> IO.println(s"Processed: $n")
}
.parEvalMap(4)(_ => IO.unit) // 4 concurrent processes
.compile
.drain
ioStream >> parallelStream
}
}
File Processing
import fs2.{Stream, text}
import fs2.io.file.{Files, Path}
import cats.effect.{IO, IOApp}
object FileProcessingExample extends IOApp.Simple {
def run: IO[Unit] = {
val inputPath = Path("input.txt")
val outputPath = Path("output.txt")
// File reading, processing, and writing
val fileProcessing = Files[IO]
.readAll(inputPath)
.through(text.utf8.decode)
.through(text.lines)
.filter(_.nonEmpty)
.map(_.toUpperCase)
.intersperse("\n")
.through(text.utf8.encode)
.through(Files[IO].writeAll(outputPath))
.compile
.drain
fileProcessing
}
}
Resource Management
import fs2.Stream
import cats.effect.{IO, IOApp, Resource}
import java.io.{BufferedReader, FileReader}
object ResourceManagementExample extends IOApp.Simple {
def run: IO[Unit] = {
// Safe resource management
val fileResource = Resource.make(
IO(new BufferedReader(new FileReader("data.txt")))
)(reader => IO(reader.close()))
// Resource usage in stream
val streamWithResource = Stream
.resource(fileResource)
.flatMap { reader =>
Stream
.repeatEval(IO(Option(reader.readLine())))
.unNoneTerminate
}
.evalMap(line => IO.println(s"Read: $line"))
.compile
.drain
streamWithResource
}
}
Error Handling
import fs2.Stream
import cats.effect.{IO, IOApp}
object ErrorHandlingExample extends IOApp.Simple {
def run: IO[Unit] = {
// Stream that may cause errors
val riskyStream = Stream
.range(1, 11)
.evalMap { n =>
if (n == 5) IO.raiseError(new RuntimeException("Error at 5"))
else IO.println(s"Processing: $n")
}
// Error handling
val recoveredStream = riskyStream
.handleErrorWith { error =>
Stream.eval(IO.println(s"Error occurred: ${error.getMessage}"))
}
.compile
.drain
// Retry
val retryStream = riskyStream
.retry(3)
.compile
.drain
recoveredStream >> retryStream
}
}
Concurrent Processing
import fs2.Stream
import cats.effect.{IO, IOApp}
import scala.concurrent.duration._
object ConcurrentProcessingExample extends IOApp.Simple {
def run: IO[Unit] = {
// Concurrent stream processing
val producer = Stream
.repeatEval(IO.println("Producing item"))
.metered(500.millis)
.take(10)
val consumer = Stream
.repeatEval(IO.println("Consuming item"))
.metered(1.second)
.take(5)
// Concurrent execution
val concurrentStreams = Stream(producer, consumer)
.parJoin(2)
.compile
.drain
concurrentStreams
}
}
HTTP Client (Http4s Integration)
import fs2.Stream
import cats.effect.{IO, IOApp}
import org.http4s.client.{Client, JavaNetClientBuilder}
import org.http4s.{Request, Uri}
object HttpClientExample extends IOApp.Simple {
def run: IO[Unit] = {
JavaNetClientBuilder[IO].create.use { client =>
val urls = List(
"https://api.example.com/data/1",
"https://api.example.com/data/2",
"https://api.example.com/data/3"
)
// Concurrent HTTP requests
val httpRequests = Stream
.emits(urls)
.evalMap(url => IO.fromEither(Uri.fromString(url)))
.map(uri => Request[IO](uri = uri))
.parEvalMap(3) { request =>
client.expect[String](request)
.handleErrorWith(error => IO(s"Error: ${error.getMessage}"))
}
.evalMap(response => IO.println(s"Response: $response"))
.compile
.drain
httpRequests
}
}
}
Database Processing (Doobie Integration)
import fs2.Stream
import cats.effect.{IO, IOApp}
import doobie._
import doobie.implicits._
object DatabaseExample extends IOApp.Simple {
case class User(id: Int, name: String, email: String)
// Database connection
val transactor = Transactor.fromDriverManager[IO](
"org.postgresql.Driver",
"jdbc:postgresql://localhost/mydb",
"user",
"password"
)
def run: IO[Unit] = {
// Streaming query
val usersStream = sql"SELECT id, name, email FROM users"
.query[User]
.stream
.transact(transactor)
// Data processing
val processedUsers = usersStream
.filter(_.name.startsWith("J"))
.map(user => user.copy(email = user.email.toLowerCase))
.evalMap(user => IO.println(s"User: $user"))
.compile
.drain
processedUsers
}
}
Custom Stream Transformation
import fs2.{Pipe, Stream}
import cats.effect.{IO, IOApp}
object CustomTransformationExample extends IOApp.Simple {
// Custom pipe definition
def batchPipe[A](batchSize: Int): Pipe[IO, A, List[A]] = {
_.chunkN(batchSize).map(_.toList)
}
def logPipe[A]: Pipe[IO, A, A] = {
_.evalMap { a =>
IO.println(s"Processing: $a").as(a)
}
}
def run: IO[Unit] = {
val program = Stream
.range(1, 21)
.through(logPipe)
.through(batchPipe(3))
.evalMap(batch => IO.println(s"Batch: $batch"))
.compile
.drain
program
}
}
Latest Trends (2025)
- FS2 3.9 Series: Performance improvements and Cats Effect 3.5 support
- Scala 3 Support: Integration with latest Scala features
- gRPC Support: Streaming gRPC services
- Observability: OpenTelemetry integration
- WebAssembly: Browser execution via Scala.js
Summary
FS2 is established as the standard choice for streaming processing in functional programming in 2025. Through type safety, composability, and resource safety, it enables building robust and maintainable streaming applications. Integration with the Typelevel ecosystem enables full-scale functional application development.