ZIO SQL
ZIO SQL is developed as "Type-safe, composable SQL for ZIO applications" - a type-safe and composable SQL library for ZIO applications. Designed to enable writing type-safe, type-inferred, and composable SQL queries in ordinary Scala, it helps prevent persistence bugs before they happen and leverages your IDE to make writing SQL productive, safe, and fun. Through functional programming principles and deep integration with the ZIO ecosystem, it provides new options for enterprise-level Scala application development.
GitHub Overview
zio-archive/zio-sql
Type-safe, composable SQL for ZIO applications
Topics
Star History
Library
ZIO SQL
Overview
ZIO SQL is developed as "Type-safe, composable SQL for ZIO applications" - a type-safe and composable SQL library for ZIO applications. Designed to enable writing type-safe, type-inferred, and composable SQL queries in ordinary Scala, it helps prevent persistence bugs before they happen and leverages your IDE to make writing SQL productive, safe, and fun. Through functional programming principles and deep integration with the ZIO ecosystem, it provides new options for enterprise-level Scala application development.
Details
ZIO SQL 2025 edition operates on both Scala 2.x and Scala 3 through type safety by construction, excellent type inference with maximal variance and lower-kinded types, and value-based design requiring no macros or plugins. It supports four databases: PostgreSQL, MySQL, MSSQL Server, and Oracle, with PostgreSQL being the most feature-complete. It adopts intentional design that uses names resembling SQL rather than trying to mimic Scala collections, utilizing reified lenses, contravariant intersection types, and in-query nullability to improve ergonomics for end-users.
Key Features
- Type Safety: Type safety by construction, detecting most classes of bugs at compile-time
- Composability: All ZIO SQL components are ordinary values that can be transformed and composed sensibly
- Type Inference: Excellent type inference through maximal variance and lower-kinded types
- No Magic: No macros or plugins required, everything functions as values
- ZIO Integration: Complete integration and seamless collaboration with ZIO ecosystem
- SQL-like Syntax: SQL-like syntax while enjoying benefits of Scala's type system
Pros and Cons
Pros
- Early detection of most issues through compile-time query validation
- High development efficiency through rich integration with ZIO ecosystem
- Improved predictability and maintainability through functional and pure design
- Enhanced type safety and ergonomics compared to Slick and Doobie
- Simple design with minimal dependencies, relying only on ZIO
- Full utilization of Scala type system while maintaining SQL-like syntax
Cons
- Scala/ZIO ecosystem exclusive with no multi-language support
- Learning curve due to dependence on path-dependent types
- Relatively new library with maturity potentially behind Slick or Doobie
- Limited database support scope (only 4 databases)
- Constraints in complex dynamic query construction
- Requires understanding of functional programming and ZIO
Reference Pages
Code Examples
Setup
// build.sbt
ThisBuild / scalaVersion := "2.13.12"
libraryDependencies ++= Seq(
"dev.zio" %% "zio" % "2.1.18",
"dev.zio" %% "zio-sql-postgres" % "0.1.2", // PostgreSQL
"dev.zio" %% "zio-sql-mysql" % "0.1.2", // MySQL
"dev.zio" %% "zio-sql-oracle" % "0.1.2", // Oracle
"dev.zio" %% "zio-sql-sqlserver" % "0.1.2", // SQL Server
// Test dependencies
"dev.zio" %% "zio-test" % "2.1.18" % Test,
"dev.zio" %% "zio-test-sbt" % "2.1.18" % Test,
// JDBC drivers
"org.postgresql" % "postgresql" % "42.7.3"
)
testFrameworks += new TestFramework("zio.test.sbt.ZTestFramework")
// project/plugins.sbt
addSbtPlugin("io.github.davidgregory084" % "sbt-tpolecat" % "0.4.4")
addSbtPlugin("ch.epfl.scala" % "sbt-scalafix" % "0.12.1")
Basic Usage
import zio._
import zio.sql.postgresql._
import java.util.UUID
import java.time.LocalDateTime
// Table definitions
object Schema {
// Users table
final case class Users(id: UUID, username: String, email: String, createdAt: LocalDateTime)
val users = defineTable[Users]("users")
val (userId, userName, userEmail, userCreatedAt) = users.columns
// Posts table
final case class Posts(id: UUID, title: String, content: String, authorId: UUID, published: Boolean)
val posts = defineTable[Posts]("posts")
val (postId, postTitle, postContent, postAuthorId, postPublished) = posts.columns
}
object DatabaseExample extends ZIOAppDefault {
import Schema._
// Database connection configuration
private val connectionPoolConfig = ZConnectionPoolConfig.default
private val dataSourceLayer = ZLayer.scoped {
ConnectionPool.dataSourceScoped
}
private val connectionPoolLayer = dataSourceLayer >>> ConnectionPool.live(connectionPoolConfig)
def run: ZIO[Any, Any, Any] = {
val program = for {
_ <- Console.printLine("ZIO SQL sample started")
// Create user
userId <- createUser("john_doe", "[email protected]")
_ <- Console.printLine(s"Created user ID: $userId")
// Search user
user <- getUserByUsername("john_doe")
_ <- Console.printLine(s"Found user: $user")
// Create post
postId <- createPost("My First Post", "This is my first post content.", userId)
_ <- Console.printLine(s"Created post ID: $postId")
// Get posts list
posts <- getPostsByAuthor(userId)
_ <- Console.printLine(s"Author's post count: ${posts.length}")
} yield ()
program.provide(connectionPoolLayer)
.foldZIO(
error => Console.printLineError(s"Error: $error") *> ZIO.succeed(ExitCode.failure),
_ => Console.printLine("Program completed") *> ZIO.succeed(ExitCode.success)
)
}
// Create user
def createUser(username: String, email: String): ZIO[ConnectionPool, Throwable, UUID] = {
val newUserId = UUID.randomUUID()
val insertQuery = insertInto(users)(userId, userName, userEmail, userCreatedAt)
.values((newUserId, username, email, LocalDateTime.now()))
for {
_ <- execute(insertQuery)
} yield newUserId
}
// Search by username
def getUserByUsername(username: String): ZIO[ConnectionPool, Throwable, Option[Users]] = {
val selectQuery = select(userId, userName, userEmail, userCreatedAt)
.from(users)
.where(userName === username)
execute(selectQuery.to((Users.apply _).tupled)).map(_.headOption)
}
// Create post
def createPost(title: String, content: String, authorId: UUID): ZIO[ConnectionPool, Throwable, UUID] = {
val newPostId = UUID.randomUUID()
val insertQuery = insertInto(posts)(postId, postTitle, postContent, postAuthorId, postPublished)
.values((newPostId, title, content, authorId, true))
for {
_ <- execute(insertQuery)
} yield newPostId
}
// Search posts by author
def getPostsByAuthor(authorId: UUID): ZIO[ConnectionPool, Throwable, List[Posts]] = {
val selectQuery = select(postId, postTitle, postContent, postAuthorId, postPublished)
.from(posts)
.where(postAuthorId === authorId)
.orderBy(Ordering.Desc(postId))
execute(selectQuery.to((Posts.apply _).tupled))
}
}
Advanced Queries and JOIN Operations
import zio._
import zio.sql.postgresql._
object AdvancedQueries {
import Schema._
// JOIN users and post counts
final case class UserWithPostCount(
userId: UUID,
username: String,
email: String,
postCount: Long
)
def getUsersWithPostCount: ZIO[ConnectionPool, Throwable, List[UserWithPostCount]] = {
val query = select(userId, userName, userEmail, Count(postId))
.from(users.leftOuter(posts).on(userId === postAuthorId))
.groupBy(userId, userName, userEmail)
.orderBy(Ordering.Desc(Count(postId)))
execute(query.to((UserWithPostCount.apply _).tupled))
}
// JOIN posts and author information
final case class PostWithAuthor(
postId: UUID,
title: String,
content: String,
authorName: String,
authorEmail: String
)
def getPostsWithAuthor: ZIO[ConnectionPool, Throwable, List[PostWithAuthor]] = {
val query = select(postId, postTitle, postContent, userName, userEmail)
.from(posts.join(users).on(postAuthorId === userId))
.where(postPublished === true)
.orderBy(Ordering.Desc(postId))
execute(query.to((PostWithAuthor.apply _).tupled))
}
// Complex conditional search
def searchPosts(
titleKeyword: Option[String],
authorKeyword: Option[String],
published: Option[Boolean]
): ZIO[ConnectionPool, Throwable, List[PostWithAuthor]] = {
val baseQuery = select(postId, postTitle, postContent, userName, userEmail)
.from(posts.join(users).on(postAuthorId === userId))
val queryWithConditions = (titleKeyword, authorKeyword, published) match {
case (Some(title), Some(author), Some(pub)) =>
baseQuery.where(
(postTitle like s"%$title%") &&
(userName like s"%$author%") &&
(postPublished === pub)
)
case (Some(title), Some(author), None) =>
baseQuery.where(
(postTitle like s"%$title%") &&
(userName like s"%$author%")
)
case (Some(title), None, Some(pub)) =>
baseQuery.where(
(postTitle like s"%$title%") &&
(postPublished === pub)
)
case (None, Some(author), Some(pub)) =>
baseQuery.where(
(userName like s"%$author%") &&
(postPublished === pub)
)
case (Some(title), None, None) =>
baseQuery.where(postTitle like s"%$title%")
case (None, Some(author), None) =>
baseQuery.where(userName like s"%$author%")
case (None, None, Some(pub)) =>
baseQuery.where(postPublished === pub)
case (None, None, None) =>
baseQuery
}
execute(queryWithConditions.orderBy(Ordering.Desc(postId)).to((PostWithAuthor.apply _).tupled))
}
// Aggregate queries
final case class PostStatistics(
totalPosts: Long,
publishedPosts: Long,
draftPosts: Long,
averageContentLength: Option[Double]
)
def getPostStatistics: ZIO[ConnectionPool, Throwable, PostStatistics] = {
val query = select(
Count(postId),
Count(postId).filter(postPublished === true),
Count(postId).filter(postPublished === false),
Avg(Length(postContent))
).from(posts)
execute(query.to((PostStatistics.apply _).tupled)).map(_.head)
}
// Subqueries
def getUsersWithRecentPosts(days: Int): ZIO[ConnectionPool, Throwable, List[Users]] = {
val recentDate = LocalDateTime.now().minusDays(days.toLong)
val subquery = select(postAuthorId)
.from(posts)
.where(userCreatedAt > recentDate)
.distinct
val query = select(userId, userName, userEmail, userCreatedAt)
.from(users)
.where(userId in subquery)
.orderBy(Ordering.Desc(userCreatedAt))
execute(query.to((Users.apply _).tupled))
}
}
Transactions and Error Handling
import zio._
import zio.sql.postgresql._
object TransactionExamples {
import Schema._
// Simultaneous creation of user and post within transaction
def createUserWithPost(
username: String,
email: String,
postTitle: String,
postContent: String
): ZIO[ConnectionPool, Throwable, (UUID, UUID)] = {
val transaction = for {
newUserId <- ZIO.succeed(UUID.randomUUID())
newPostId <- ZIO.succeed(UUID.randomUUID())
// Create user
_ <- execute(
insertInto(users)(userId, userName, userEmail, userCreatedAt)
.values((newUserId, username, email, LocalDateTime.now()))
)
// Create post
_ <- execute(
insertInto(posts)(postId, postTitle, postContent, postAuthorId, postPublished)
.values((newPostId, postTitle, postContent, newUserId, false))
)
} yield (newUserId, newPostId)
transaction.atomically
}
// Custom error types
sealed trait DatabaseError extends Throwable
case class UserNotFound(username: String) extends DatabaseError {
override def getMessage: String = s"User not found: $username"
}
case class DuplicateUser(username: String) extends DatabaseError {
override def getMessage: String = s"User already exists: $username"
}
case class InvalidInput(message: String) extends DatabaseError {
override def getMessage: String = s"Invalid input: $message"
}
// User creation with error handling
def createUserSafely(
username: String,
email: String
): ZIO[ConnectionPool, DatabaseError, UUID] = {
// Input validation
val validation = for {
_ <- ZIO.when(username.trim.isEmpty)(ZIO.fail(InvalidInput("Username cannot be empty")))
_ <- ZIO.when(email.trim.isEmpty)(ZIO.fail(InvalidInput("Email cannot be empty")))
_ <- ZIO.when(!email.contains("@"))(ZIO.fail(InvalidInput("Invalid email format")))
} yield ()
val createUser = for {
_ <- validation
// Check existing user
existingUser <- getUserByUsername(username)
_ <- ZIO.when(existingUser.isDefined)(ZIO.fail(DuplicateUser(username)))
// Create user
newUserId <- createUser(username, email)
} yield newUserId
createUser.mapError {
case e: DatabaseError => e
case _: java.sql.SQLException => InvalidInput("Database constraint violation")
case other => InvalidInput(s"Unexpected error: ${other.getMessage}")
}
}
// Query execution with retry functionality
def executeWithRetry[R, A](
query: ZIO[R, Throwable, A],
maxRetries: Int = 3,
delay: Duration = 1.second
): ZIO[R, Throwable, A] = {
query.retry(
Schedule.exponential(delay) && Schedule.recurs(maxRetries)
).tapError { error =>
Console.printLineError(s"Query failed after $maxRetries retries: ${error.getMessage}")
}
}
// Batch operations
def batchUpdatePosts(
postIds: List[UUID],
published: Boolean
): ZIO[ConnectionPool, Throwable, Long] = {
val updates = postIds.map { id =>
update(posts)
.set(postPublished, published)
.where(postId === id)
}
val batchTransaction = ZIO.foreach(updates)(execute).map(_.sum)
batchTransaction.atomically
}
// Connection pool monitoring
def monitorConnectionPool: ZIO[ConnectionPool, Nothing, Unit] = {
val monitoring = for {
_ <- Console.printLine("Connection pool monitoring started")
_ <- ZIO.never // Replace with actual monitoring logic
} yield ()
monitoring.orDie
}
}
object DatabaseService {
import Schema._
import TransactionExamples._
// Service layer implementation example
trait UserService {
def createUser(username: String, email: String): ZIO[Any, DatabaseError, UUID]
def getUser(username: String): ZIO[Any, DatabaseError, Users]
def getAllUsers: ZIO[Any, DatabaseError, List[Users]]
}
case class UserServiceLive() extends UserService {
def createUser(username: String, email: String): ZIO[Any, DatabaseError, UUID] =
createUserSafely(username, email).provide(ZLayer.succeed(ConnectionPool))
def getUser(username: String): ZIO[Any, DatabaseError, Users] =
getUserByUsername(username)
.provide(ZLayer.succeed(ConnectionPool))
.flatMap {
case Some(user) => ZIO.succeed(user)
case None => ZIO.fail(UserNotFound(username))
}
.mapError {
case e: DatabaseError => e
case other => InvalidInput(other.getMessage)
}
def getAllUsers: ZIO[Any, DatabaseError, List[Users]] = {
val query = select(userId, userName, userEmail, userCreatedAt)
.from(users)
.orderBy(Ordering.Desc(userCreatedAt))
execute(query.to((Users.apply _).tupled))
.provide(ZLayer.succeed(ConnectionPool))
.mapError(error => InvalidInput(error.getMessage))
}
}
// Service layer ZLayer
val userServiceLayer: ZLayer[ConnectionPool, Nothing, UserService] =
ZLayer.succeed(UserServiceLive())
}