Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 101 additions & 6 deletions core/src/main/scala/org/typelevel/keypool/KeyPool.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,17 @@ import org.typelevel.keypool.internal._
* This pools internal guarantees are that the max number of values are in the pool at any time, not
* maximum number of operations. To do the latter application level bounds should be used.
*
* A background reaper thread is kept alive for the length of the key pools life.
* A background reaper thread is kept alive for the length of the key pool's life.
*
* When resources are taken from the pool they are received as a [[Managed]]. This [[Managed]] has a
* Ref to a [[Reusable]] which indicates whether or not the pool can reuse the resource.
* `Ref` to a [[Reusable]] which indicates whether the pool can reuse the resource.
*
* Unlike [[Pool]], which is a single-key convenience wrapper, `KeyPool` partitions resources by a
* user-provided key type `A` and enforces per-key limits and accounting.
*
* @see
* [[Pool]]
*/

trait KeyPool[F[_], A, B] {

/**
Expand Down Expand Up @@ -346,38 +351,119 @@ object KeyPool {
onReaperException
)

/**
* Register a callback `f` to run after a new [[Managed]] item is created. Any error raised by
* the callback is ignored, and the created item is returned unchanged.
*
* @note
* If multiple callbacks are registered, their execution order is not guaranteed and callers
* should not rely on any particular ordering.
*/
def doOnCreate(f: B => F[Unit]): Builder[F, A, B] =
copy(kpRes = { (k: A) => this.kpRes(k).flatMap(v => Resource.eval(f(v).attempt.void.as(v))) })

copy(kpRes = { (k: A) => this.kpRes(k).flatMap(v => Resource.eval(f(v).voidError.as(v))) })

/**
* Register a callback `f` to run when a [[Managed]] item is about to be destroyed. Any error
* raised by the callback is ignored, and the item will be destroyed regardless.
*
* @note
* If multiple callbacks are registered, their execution order is not guaranteed and callers
* should not rely on any particular ordering.
*/
def doOnDestroy(f: B => F[Unit]): Builder[F, A, B] =
copy(kpRes = { (k: A) =>
this.kpRes(k).flatMap(v => Resource.make(Applicative[F].unit)(_ => f(v).attempt.void).as(v))
this.kpRes(k).flatMap(v => Resource.make(Applicative[F].unit)(_ => f(v).voidError).as(v))
})

/**
* Set the default [[Reusable]] state applied when resources are returned to the pool. This
* default can be overridden per-resource via [[Managed.canBeReused]] from
* [[KeyPool.take(k:A)* KeyPool.take]]. If not configured, the default is [[Reusable.Reuse]].
*
* @param defaultReuseState
* whether resources returned to the pool should be reused ([[Reusable.Reuse]]) or destroyed
* ([[Reusable.DontReuse]]) by default.
*/
def withDefaultReuseState(defaultReuseState: Reusable): Builder[F, A, B] =
copy(kpDefaultReuseState = defaultReuseState)

/**
* Set how long an idle resource is allowed to remain in the pool before it becomes eligible for
* eviction. If not configured, the builder defaults to
* [[KeyPool.Builder.Defaults.idleTimeAllowedInPool 30 seconds]].
*
* @param duration
* maximum idle time allowed in the pool.
*/
def withIdleTimeAllowedInPool(duration: Duration): Builder[F, A, B] =
copy(idleTimeAllowedInPool = duration)

/**
* Set the interval between eviction runs of the pool reaper. If not configured, the builder
* defaults to [[KeyPool.Builder.Defaults.durationBetweenEvictionRuns 5 seconds]].
*
* @param duration
* time between successive eviction runs.
*/
def withDurationBetweenEvictionRuns(duration: Duration): Builder[F, A, B] =
copy(durationBetweenEvictionRuns = duration)

/**
* Set the maximum number of idle resources allowed per key. If not configured, the builder
* defaults to [[KeyPool.Builder.Defaults.maxPerKey 100]].
*
* @param f
* function returning max idle count for a given key.
*/
def withMaxPerKey(f: A => Int): Builder[F, A, B] =
copy(kpMaxPerKey = f)

/**
* Set the maximum number of idle resources allowed across all keys. If not configured, the
* builder defaults to [[KeyPool.Builder.Defaults.maxIdle 100]].
*
* @param maxIdle
* maximum idle resources in the pool.
*/
def withMaxIdle(maxIdle: Int): Builder[F, A, B] =
copy(kpMaxIdle = maxIdle)

/**
* Set the maximum total number of concurrent resources permitted by the pool. If not
* configured, the builder defaults to [[KeyPool.Builder.Defaults.maxTotal 100]].
*
* @param total
* maximum total resources.
*/
def withMaxTotal(total: Int): Builder[F, A, B] =
copy(kpMaxTotal = total)

/**
* Set the [[Fairness]] policy for acquiring permits from the global semaphore controlling total
* resources. If not configured, the builder defaults to
* [[KeyPool.Builder.Defaults.fairness Fairness.Fifo]].
*
* @param fairness
* fairness policy - [[Fairness.Fifo]] or [[Fairness.Lifo]].
*/
def withFairness(fairness: Fairness): Builder[F, A, B] =
copy(fairness = fairness)

/**
* Register a handler `f` invoked for any `Throwable` observed by the background reaper; it runs
* when the reaper loop reports an error.
*
* The provided function is invoked with any `Throwable` observed in the reaper loop. If the
* handler itself fails, that failure is swallowed and thereby ignored - the reaper will
* continue running and may invoke the handler again for subsequent errors.
*/
def withOnReaperException(f: Throwable => F[Unit]): Builder[F, A, B] =
copy(onReaperException = f)

/**
* Create a `KeyPool` wrapped in a `Resource`, initializing pool internals from the configured
* builder parameters and defaults.
*/
def build: Resource[F, KeyPool[F, A, B]] = {
def keepRunning[Z](fa: F[Z]): F[Z] =
fa.onError { case e => onReaperException(e) }.attempt >> keepRunning(fa)
Expand Down Expand Up @@ -409,6 +495,11 @@ object KeyPool {
}

object Builder {

/**
* Create a new `Builder` for a `Pool` from a `Resource` that produces values stored in the
* pool.
*/
def apply[F[_]: Temporal, A, B](
res: A => Resource[F, B]
): Builder[F, A, B] = new Builder[F, A, B](
Expand All @@ -423,6 +514,10 @@ object KeyPool {
Defaults.onReaperException[F]
)

/**
* Convenience constructor that accepts `create` and `destroy` functions and builds a `Resource`
* internally.
*/
def apply[F[_]: Temporal, A, B](
create: A => F[B],
destroy: B => F[Unit]
Expand Down
16 changes: 16 additions & 0 deletions core/src/main/scala/org/typelevel/keypool/Managed.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,22 @@ import cats.effect.kernel._
*
* This knows whether it was reused or not, and has a reference that when it leaves the controlling
* scope will dictate whether it is shutdown or returned to the pool.
*
* @param value
* the underlying resource held by this `Managed` instance.
* @param isReused
* indicates whether the resource was taken from the pool (`true`) or newly created (`false`).
* @param canBeReused
* A mutable reference controlling reuse: when the `Managed` is released this `Ref` determines
* whether the resource is returned to the pool or shut down.
*
* @note
* If the caller does not modify `canBeReused` on the returned `Managed`, the pool's default reuse
* state (configured via [[KeyPool.Builder.withDefaultReuseState]]) will be used when the resource
* is released.
*
* @see
* [[Reusable]]
*/
final class Managed[F[_], A] private[keypool] (
val value: A,
Expand Down
96 changes: 93 additions & 3 deletions core/src/main/scala/org/typelevel/keypool/Pool.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,17 @@ import scala.concurrent.duration._
* This pools internal guarantees are that the max number of values are in the pool at any time, not
* maximum number of operations. To do the latter application level bounds should be used.
*
* A background reaper thread is kept alive for the length of the pools life.
* A background reaper thread is kept alive for the length of the pool's life.
*
* When resources are taken from the pool they are received as a [[Managed]]. This [[Managed]] has a
* Ref to a [[Reusable]] which indicates whether or not the pool can reuse the resource.
* `Ref` to a [[Reusable]] which indicates whether the pool can reuse the resource.
*
* `Pool` is a convenience, single-key specialization of [[KeyPool]] that does not partition
* resources by key and exposes simpler `take`/`state` APIs.
*
* @see
* [[KeyPool]]
*/

trait Pool[F[_], B] {

/**
Expand Down Expand Up @@ -100,32 +105,100 @@ object Pool {
onReaperException
)

/**
* Register a callback `f` to run after a new [[Managed]] item is created. Any error raised by
* the callback is ignored, and the created item is returned unchanged.
*
* @note
* If multiple callbacks are registered, their execution order is not guaranteed and callers
* should not rely on any particular ordering.
*/
def doOnCreate(f: B => F[Unit]): Builder[F, B] =
copy(kpRes = this.kpRes.flatMap(v => Resource.eval(f(v).attempt.void.as(v))))

/**
* Register a callback `f` to run when a [[Managed]] item is about to be destroyed. Any error
* raised by the callback is ignored, and the item will be destroyed regardless.
*
* @note
* If multiple callbacks are registered, their execution order is not guaranteed and callers
* should not rely on any particular ordering.
*/
def doOnDestroy(f: B => F[Unit]): Builder[F, B] =
copy(kpRes =
this.kpRes.flatMap(v => Resource.make(Applicative[F].unit)(_ => f(v).attempt.void).as(v))
)

/**
* Set the default [[Reusable]] state applied when resources are returned to the pool. This
* default can be overridden per-resource via [[Managed.canBeReused]] from [[Pool.take]]. If not
* configured, the default is [[Reusable.Reuse]].
*
* @param defaultReuseState
* whether resources returned to the pool should be reused ([[Reusable.Reuse]]) or destroyed
* ([[Reusable.DontReuse]]) by default.
*/
def withDefaultReuseState(defaultReuseState: Reusable): Builder[F, B] =
copy(kpDefaultReuseState = defaultReuseState)

/**
* Set how long an idle resource is allowed to remain in the pool before it becomes eligible for
* eviction. If not configured, the builder defaults to 30 seconds.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+

*
* @param duration
* maximum idle time allowed in the pool.
*/
def withIdleTimeAllowedInPool(duration: Duration): Builder[F, B] =
copy(idleTimeAllowedInPool = duration)

/**
* Set the interval between eviction runs of the pool reaper. If not configured, the builder
* defaults to 5 seconds.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+

*
* @param duration
* time between successive eviction runs.
*/
def withDurationBetweenEvictionRuns(duration: Duration): Builder[F, B] =
copy(durationBetweenEvictionRuns = duration)

/**
* Set the maximum number of idle resources allowed across the pool. If not configured, the
* builder defaults to 100.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+

*
* @param maxIdle
* maximum idle resources in the pool.
*/
def withMaxIdle(maxIdle: Int): Builder[F, B] =
copy(kpMaxIdle = maxIdle)

/**
* Set the maximum total number of concurrent resources permitted by the pool. If not
* configured, the builder defaults to 100.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+

*
* @param total
* maximum total resources.
*/
def withMaxTotal(total: Int): Builder[F, B] =
copy(kpMaxTotal = total)

/**
* Set the [[Fairness]] policy for acquiring permits from the global semaphore controlling total
* resources. If not configured, the builder defaults to [[Fairness.Fifo]].
*
* @param fairness
* fairness policy - [[Fairness.Fifo]] or [[Fairness.Lifo]].
*/
def withFairness(fairness: Fairness): Builder[F, B] =
copy(fairness = fairness)

/**
* Register a handler `f` invoked for any `Throwable` observed by the background reaper; it runs
* when the reaper loop reports an error.
*
* The provided function is invoked with any `Throwable` observed in the reaper loop. If the
* handler itself fails, that failure is swallowed and thereby ignored - the reaper will
* continue running and may invoke the handler again for subsequent errors.
*/
def withOnReaperException(f: Throwable => F[Unit]): Builder[F, B] =
copy(onReaperException = f)

Expand All @@ -142,6 +215,14 @@ object Pool {
onReaperException = onReaperException
)

/**
* Create a `Pool` wrapped in a `Resource`, initializing pool internals from the configured
* builder parameters and defaults.
*
* @note
* The implementation is a thin single-key specialization that delegates to
* [[KeyPool.Builder]] under the hood.
*/
def build: Resource[F, Pool[F, B]] = {
toKeyPoolBuilder.build.map { inner =>
new Pool[F, B] {
Expand All @@ -153,6 +234,11 @@ object Pool {
}

object Builder {

/**
* Create a new `Builder` for a `Pool` from a `Resource` that produces values stored in the
* pool.
*/
def apply[F[_]: Temporal, B](
res: Resource[F, B]
): Builder[F, B] = new Builder[F, B](
Expand All @@ -166,6 +252,10 @@ object Pool {
Defaults.onReaperException[F]
)

/**
* Convenience constructor that accepts `create` and `destroy` functions and builds a `Resource`
* internally.
*/
def apply[F[_]: Temporal, B](
create: F[B],
destroy: B => F[Unit]
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/org/typelevel/keypool/Reusable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@
package org.typelevel.keypool

/**
* Reusable is a Coproduct of the two states a Resource can be in at the end of its lifetime.
* Reusable is a Coproduct of the two states a `Resource` can be in at the end of its lifetime.
*
* If it is Reuse then it will be attempted to place back in the pool, if it is in DontReuse the
* resource will be shutdown.
* If it is `Reusable.Reuse` then it will be attempted to place back in the pool, if it is in
* `Reusable.DontReuse` the resource will be shutdown.
*/
sealed trait Reusable
object Reusable {
Expand Down
5 changes: 5 additions & 0 deletions docs/classes/directory.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
laika.navigationOrder = [
pool.md
keypool.md
misc.md
]
1 change: 1 addition & 0 deletions docs/classes/examples/directory.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
laika.targetFormats = []
Loading
Loading