WIP on the engine runtime

This commit is contained in:
Pat Garrity 2025-07-29 07:31:09 -05:00
parent 96f0fab473
commit b23b6cfdea
Signed by: pfm
GPG key ID: 5CA5D21BAB7F3A76
11 changed files with 275 additions and 92 deletions

View file

@ -1,4 +1,4 @@
val scala3: String = "3.5.1" val scala3: String = "3.7.1"
ThisBuild / scalaVersion := scala3 ThisBuild / scalaVersion := scala3
ThisBuild / versionScheme := Some("semver-spec") ThisBuild / versionScheme := Some("semver-spec")
@ -10,7 +10,7 @@ ThisBuild / externalResolvers := Seq(
) )
ThisBuild / licenses := Seq( ThisBuild / licenses := Seq(
"MIT" -> url("https://garrity.co/MIT.html") "MIT" -> url("https://git.garrity.co/garrity-software/gs-test/LICENSE")
) )
val noPublishSettings = Seq( val noPublishSettings = Seq(
@ -26,25 +26,25 @@ val sharedSettings = Seq(
val Deps = new { val Deps = new {
val Cats = new { val Cats = new {
val Core: ModuleID = "org.typelevel" %% "cats-core" % "2.12.0" val Core: ModuleID = "org.typelevel" %% "cats-core" % "2.13.0"
val Effect: ModuleID = "org.typelevel" %% "cats-effect" % "3.5.4" val Effect: ModuleID = "org.typelevel" %% "cats-effect" % "3.6.3"
} }
val Fs2 = new { val Fs2 = new {
val Core: ModuleID = "co.fs2" %% "fs2-core" % "3.10.2" val Core: ModuleID = "co.fs2" %% "fs2-core" % "3.12.0"
} }
val Natchez = new { val Natchez = new {
val Core: ModuleID = "org.tpolecat" %% "natchez-core" % "0.3.6" val Core: ModuleID = "org.tpolecat" %% "natchez-core" % "0.3.8"
} }
val Gs = new { val Gs = new {
val Uuid: ModuleID = "gs" %% "gs-uuid-v0" % "0.3.0" val Uuid: ModuleID = "gs" %% "gs-uuid-v0" % "0.4.1"
val Timing: ModuleID = "gs" %% "gs-timing-v0" % "0.1.1" val Timing: ModuleID = "gs" %% "gs-timing-v0" % "0.1.2"
val Datagen: ModuleID = "gs" %% "gs-datagen-core-v0" % "0.2.0" val Datagen: ModuleID = "gs" %% "gs-datagen-core-v0" % "0.3.1"
} }
val MUnit: ModuleID = "org.scalameta" %% "munit" % "1.0.1" val MUnit: ModuleID = "org.scalameta" %% "munit" % "1.1.1"
} }
lazy val testSettings = Seq( lazy val testSettings = Seq(

View file

@ -18,7 +18,7 @@ object Check:
*/ */
def apply[A](candidate: A): Check[A] = candidate def apply[A](candidate: A): Check[A] = candidate
extension [A: ClassTag](candidate: Check[A]) extension [A](candidate: Check[A])
/** @return /** @return
* The unwrapped value of this [[Check]]. * The unwrapped value of this [[Check]].
*/ */
@ -42,6 +42,7 @@ object Check:
)( )(
using using
CanEqual[A, A], CanEqual[A, A],
ClassTag[A],
SourcePosition SourcePosition
): TestResult = ): TestResult =
Assertion.IsEqualTo.evaluate(candidate, expected) Assertion.IsEqualTo.evaluate(candidate, expected)
@ -67,6 +68,7 @@ object Check:
)( )(
using using
CanEqual[A, A], CanEqual[A, A],
ClassTag[A],
SourcePosition SourcePosition
): F[TestResult] = ): F[TestResult] =
Sync[F].delay(isEqualTo(expected)) Sync[F].delay(isEqualTo(expected))

View file

@ -2,7 +2,8 @@ package gs.test.v0.api
/** Base trait for all failures recognized by gs-test. /** Base trait for all failures recognized by gs-test.
*/ */
sealed trait TestFailure sealed trait TestFailure:
def message: String
object TestFailure: object TestFailure:
@ -44,6 +45,7 @@ object TestFailure:
*/ */
case class ExceptionThrown( case class ExceptionThrown(
cause: Throwable cause: Throwable
) extends TestFailure ) extends TestFailure:
override def message: String = cause.getMessage()
end TestFailure end TestFailure

View file

@ -1,7 +1,6 @@
package gs.test.v0.api package gs.test.v0.api
import cats.Show import cats.Show
import cats.effect.Async
/** Each group is comprised of a list of [[Test]]. This list may be empty. /** Each group is comprised of a list of [[Test]]. This list may be empty.
* *
@ -18,7 +17,7 @@ import cats.effect.Async
* @param tests * @param tests
* The list of tests in this group. * The list of tests in this group.
*/ */
final class TestGroupDefinition[F[_]: Async]( final class TestGroupDefinition[F[_]](
val name: TestGroupDefinition.Name, val name: TestGroupDefinition.Name,
val documentation: Option[String], val documentation: Option[String],
val testTags: List[Tag], val testTags: List[Tag],

View file

@ -12,6 +12,5 @@ case class SuiteExecution(
countSeen: Long, countSeen: Long,
countSucceeded: Long, countSucceeded: Long,
countFailed: Long, countFailed: Long,
countIgnored: Long,
executedAt: Instant executedAt: Instant
) )

View file

@ -0,0 +1,25 @@
package gs.test.v0.runtime.engine
object EngineConstants:
object Tracing:
val RootSpan: String = "test-group"
val BeforeGroup: String = "before-group"
val AfterGroup: String = "after-group"
val BeforeTest: String = "before-test"
val AfterTest: String = "after-test"
val InGroup: String = "in-group"
val TestSpan: String = "test"
end Tracing
object MetaData:
val TestGroupName: String = "test_group_name"
val TestExecutionId: String = "test_execution_id"
val TestName: String = "test_name"
end MetaData
end EngineConstants

View file

@ -3,32 +3,93 @@ package gs.test.v0.runtime.engine
import cats.effect.Async import cats.effect.Async
import cats.effect.Ref import cats.effect.Ref
import cats.syntax.all.* import cats.syntax.all.*
import gs.test.v0.runtime.TestExecution
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import scala.concurrent.duration.FiniteDuration import scala.concurrent.duration.FiniteDuration
final class EngineStats[F[_]: Async]( /** Statistics for executed tests. Used by the [[TestEngine]].
val overallDuration: Ref[F, FiniteDuration], *
val countSeen: Ref[F, Long], * @param overallDuration
val countSucceeded: Ref[F, Long], * Duration of all recorded tests.
val countFailed: Ref[F, Long], * @param countSeen
val countIgnored: Ref[F, Long] * Number of tests encountered.
) * @param countSucceeded
* Number of tests that succeeded.
* @param countFailed
* Number of tests that failed.
*/
final class EngineStats[F[_]: Async] private (
overallDuration: Ref[F, FiniteDuration],
countSeen: Ref[F, Long],
countSucceeded: Ref[F, Long],
countFailed: Ref[F, Long]
):
/** @return
* The accumulated duration of test executions.
*/
def duration: F[FiniteDuration] = overallDuration.get
/** @return
* Number of tests encountered.
*/
def seen: F[Long] = countSeen.get
/** @return
* Number of tests that succeeded.
*/
def succeeded: F[Long] = countSucceeded.get
/** @return
* Number of tests that failed.
*/
def failed: F[Long] = countFailed.get
/** Update the stats based on the results of an entire group.
*
* @param groupResult
* The [[GroupResult]] representing the group.
* @return
* Side-effect which updates statistic values.
*/
def updateForGroup(groupResult: GroupResult): F[Unit] =
for
_ <- overallDuration.update(base => base + groupResult.duration)
_ <- groupResult.testExecutions.map(updateForTest).sequence
yield ()
/** Update the stats based on the results of a single test.
*
* @param testExecution
* The [[TestExecution]] representing the test.
* @return
* Side-effect which updates statistic values.
*/
def updateForTest(testExecution: TestExecution): F[Unit] =
for
_ <- countSeen.update(_ + 1L)
_ <- testExecution.result match
case Left(_) => countFailed.update(_ + 1L)
case Right(_) => countSucceeded.update(_ + 1L)
yield ()
object EngineStats: object EngineStats:
/** Initialize a new [[EngineStats]] instance with all values set to 0.
*
* @return
* The new [[EngineStats]] instance.
*/
def initialize[F[_]: Async]: F[EngineStats[F]] = def initialize[F[_]: Async]: F[EngineStats[F]] =
for for
duration <- Ref.of(FiniteDuration(0L, TimeUnit.NANOSECONDS)) duration <- Ref.of(FiniteDuration(0L, TimeUnit.NANOSECONDS))
seen <- Ref.of(0L) seen <- Ref.of(0L)
succeeded <- Ref.of(0L) succeeded <- Ref.of(0L)
failed <- Ref.of(0L) failed <- Ref.of(0L)
ignored <- Ref.of(0L)
yield new EngineStats[F]( yield new EngineStats[F](
overallDuration = duration, overallDuration = duration,
countSeen = seen, countSeen = seen,
countSucceeded = succeeded, countSucceeded = succeeded,
countFailed = failed, countFailed = failed
countIgnored = ignored
) )
end EngineStats end EngineStats

View file

@ -4,6 +4,17 @@ import gs.test.v0.api.TestGroupDefinition
import gs.test.v0.runtime.TestExecution import gs.test.v0.runtime.TestExecution
import scala.concurrent.duration.FiniteDuration import scala.concurrent.duration.FiniteDuration
/** Represents the results of executing an entire group of tests.
*
* @param name
* The name of the executed group.
* @param documentation
* The documentation for the group.
* @param duration
* The overall duration of execution.
* @param testExecutions
* List of test results.
*/
final class GroupResult( final class GroupResult(
val name: TestGroupDefinition.Name, val name: TestGroupDefinition.Name,
val documentation: Option[String], val documentation: Option[String],

View file

@ -15,6 +15,40 @@ import java.time.Instant
import natchez.EntryPoint import natchez.EntryPoint
import natchez.Span import natchez.Span
/** This class is responsible for executing suites of tests.
*
* ## How Execution Works
*
* Test execution starts at the group level, via a stream of
* [[TestGroupDefinition]]. Each group of tests is executed concurrently based
* on the [[EngineConfiguration]].
*
* ### Executing a Single Group
*
* Each [[TestGroupDefinition]] is executed by executing, in order:
*
* - The `beforeGroup` effect.
* - Each test (configurable concurrency).
* - The `afterGroup` effect.
*
* The before/after effects are described at the group level.
*
* ### Executing a Single Test
*
* Each [[TestDefinition]] is executed by executing, in order:
*
* - The `beforeEachTest` effect.
* - The test code.
* - The `afterEachTest` effect.
*
* The before/after effects are described at the group level.
*
* ## OpenTelemetry Support
*
* Each [[SuiteExecution]] produces a single trace per [[TestGroupDefinition]].
* This means that each group has a Trace ID and a tree of execution, with one
* span per test.
*/
final class TestEngine[F[_]: Async]( final class TestEngine[F[_]: Async](
val configuration: EngineConfiguration, val configuration: EngineConfiguration,
timing: Timing[F], timing: Timing[F],
@ -28,89 +62,73 @@ final class TestEngine[F[_]: Async](
suite: TestSuite, suite: TestSuite,
tests: fs2.Stream[F, TestGroupDefinition[F]] tests: fs2.Stream[F, TestGroupDefinition[F]]
): F[SuiteExecution] = ): F[SuiteExecution] =
// TODO: REPORTING -- need interface
for for
executedAt <- Async[F].delay(Instant.now(clock)) executedAt <- Async[F].delay(Instant.now(clock))
stats <- EngineStats.initialize[F] stats <- EngineStats.initialize[F]
// TODO: Just do telemetry for the whole damn thing.
_ <- tests _ <- tests
.mapAsync(configuration.groupConcurrency.toInt())(runGroup) .mapAsync(configuration.groupConcurrency.toInt())(runGroup)
.evalTap(updateGroupStats) .evalTap(stats.updateForGroup)
.evalTap(reportGroup)
.flatMap(groupResult => fs2.Stream.emits(groupResult.testExecutions))
.evalTap(updateTestStats)
.evalMap(reportTestExecution)
.compile .compile
.drain .drain
overallDuration <- stats.overallDuration.get suiteExecution <- makeSuiteExecution(suite, stats, executedAt)
countSeen <- stats.countSeen.get yield suiteExecution
countSucceeded <- stats.countSucceeded.get
countFailed <- stats.countFailed.get
countIgnored <- stats.countIgnored.get
yield SuiteExecution(
id = suiteExecutionIdGenerator.next(),
name = suite.name,
documentation = suite.documentation,
duration = overallDuration,
countSeen = countSeen,
countSucceeded = countSucceeded,
countFailed = countFailed,
countIgnored = countIgnored,
executedAt = executedAt
)
private def updateGroupStats(groupResult: GroupResult): F[Unit] = ???
private def updateTestStats(testExecution: TestExecution): F[Unit] = ???
private def reportGroup(groupResult: GroupResult): F[Unit] = ???
private def reportTestExecution(testExecution: TestExecution): F[Unit] = ???
private def runSpan[A](
name: String,
root: Span[F],
f: F[A]
): F[A] =
root.span(name).use(_ => f)
def runGroup( def runGroup(
group: TestGroupDefinition[F] group: TestGroupDefinition[F]
): F[GroupResult] = ): F[GroupResult] =
entryPoint.root("test-group").use { rootSpan => entryPoint.root(EngineConstants.Tracing.RootSpan).use { rootSpan =>
for for
_ <- rootSpan.put("test_group_name" -> group.name.show) // Augment the span with all group-level metadata.
_ <- rootSpan
.put(EngineConstants.MetaData.TestGroupName -> group.name.show)
// Start the timer for the entire group.
timer <- timing.start()
// Run the before-group logic (in its own span).
_ <- runSpan( _ <- runSpan(
"before-group", EngineConstants.Tracing.BeforeGroup,
rootSpan, rootSpan,
group.beforeGroup.getOrElse(Async[F].unit) group.beforeGroup.getOrElse(Async[F].unit)
) )
stream <- executeGroupTests(group, rootSpan)
// Execute all tests within this group.
testExecutions <- executeGroupTests(group, rootSpan)
// Run the after-group logic (in its own span).
_ <- runSpan( _ <- runSpan(
"after-group", EngineConstants.Tracing.AfterGroup,
rootSpan, rootSpan,
group.afterGroup.getOrElse(Async[F].unit) group.afterGroup.getOrElse(Async[F].unit)
) )
yield stream
}
private def executeGroupTests( // Calculate the overall elapsed time for this group.
group: TestGroupDefinition[F],
rootSpan: Span[F]
): F[GroupResult] =
rootSpan.span("group").use { groupSpan =>
for
traceId <- rootSpan.traceId.map(parseTraceId)
timer <- timing.start()
executions <- streamGroupTests(group, groupSpan).compile.toList
elapsed <- timer.checkpoint() elapsed <- timer.checkpoint()
yield new GroupResult( yield new GroupResult(
name = group.name, name = group.name,
documentation = group.documentation, documentation = group.documentation,
duration = elapsed.duration, duration = elapsed.duration,
testExecutions = executions testExecutions = testExecutions
) )
} }
private def executeGroupTests(
group: TestGroupDefinition[F],
rootSpan: Span[F]
): F[List[TestExecution]] =
rootSpan.span(EngineConstants.Tracing.InGroup).use { groupSpan =>
for
// If, for some reason, the generated span has no Trace ID, this will
// throw an exception.
traceId <- rootSpan.traceId.map(parseTraceId)
executions <- streamGroupTests(traceId, group, groupSpan).compile.toList
yield executions
}
private def streamGroupTests( private def streamGroupTests(
traceId: UUID,
group: TestGroupDefinition[F], group: TestGroupDefinition[F],
groupSpan: Span[F] groupSpan: Span[F]
): fs2.Stream[F, TestExecution] = ): fs2.Stream[F, TestExecution] =
@ -118,13 +136,33 @@ final class TestEngine[F[_]: Async](
.emits(group.tests) .emits(group.tests)
.mapAsync(configuration.testConcurrency.toInt()) { test => .mapAsync(configuration.testConcurrency.toInt()) { test =>
for for
// Generate a unique TestExecutionId for this execution.
testExecutionId <- Async[F].delay( testExecutionId <- Async[F].delay(
TestExecution.Id(testExecutionIdGenerator.next()) TestExecution.Id(testExecutionIdGenerator.next())
) )
// Start the timer for the test, including the before/after
// components.
timer <- timing.start() timer <- timing.start()
_ <- group.beforeEachTest.getOrElse(Async[F].unit)
// Run the before-test logic (in its own span).
_ <- runSpan(
EngineConstants.Tracing.BeforeTest,
groupSpan,
group.beforeEachTest.getOrElse(Async[F].unit)
)
// Run the test (in its own span).
result <- runSingleTest(testExecutionId, test, groupSpan) result <- runSingleTest(testExecutionId, test, groupSpan)
_ <- group.afterEachTest.getOrElse(Async[F].unit)
// Run the after-test logic (in its own span).
_ <- runSpan(
EngineConstants.Tracing.AfterTest,
groupSpan,
group.afterEachTest.getOrElse(Async[F].unit)
)
// Calculate the overall elapsed time for this single test.
elapsed <- timer.checkpoint() elapsed <- timer.checkpoint()
yield TestExecution( yield TestExecution(
id = testExecutionId, id = testExecutionId,
@ -133,7 +171,8 @@ final class TestEngine[F[_]: Async](
tags = test.tags, tags = test.tags,
markers = test.markers, markers = test.markers,
result = result, result = result,
traceId = ???, // TODO TraceID isn't that useful here, need SpanID
traceId = traceId,
sourcePosition = test.sourcePosition, sourcePosition = test.sourcePosition,
duration = elapsed.duration duration = elapsed.duration
) )
@ -144,13 +183,58 @@ final class TestEngine[F[_]: Async](
test: TestDefinition[F], test: TestDefinition[F],
groupSpan: Span[F] groupSpan: Span[F]
): F[Either[TestFailure, Any]] = ): F[Either[TestFailure, Any]] =
groupSpan.span("test").use { span => groupSpan.span(EngineConstants.Tracing.TestSpan).use { span =>
for for
// TODO: Constants _ <- span
_ <- span.put("test_execution_id" -> testExecutionId.show) .put(EngineConstants.MetaData.TestExecutionId -> testExecutionId.show)
_ <- span.put("test_name" -> test.name.show) _ <- span.put(EngineConstants.MetaData.TestName -> test.name.show)
result <- test.unitOfWork.doWork(span) result <- test.unitOfWork.doWork(span)
yield result yield result
} }
private def parseTraceId(candidate: Option[String]): UUID = ??? private def parseTraceId(candidate: Option[String]): UUID =
candidate.flatMap(UUID.parse) match
case Some(traceId) => traceId
case None =>
throw new IllegalArgumentException(
"Created a span with an invalid Trace ID: " + candidate
)
private def makeSuiteExecution(
suite: TestSuite,
stats: EngineStats[F],
executedAt: Instant
): F[SuiteExecution] =
for
overallDuration <- stats.duration
countSeen <- stats.seen
countSucceeded <- stats.succeeded
countFailed <- stats.failed
yield SuiteExecution(
id = suiteExecutionIdGenerator.next(),
name = suite.name,
documentation = suite.documentation,
duration = overallDuration,
countSeen = countSeen,
countSucceeded = countSucceeded,
countFailed = countFailed,
executedAt = executedAt
)
/** Run some effect as a child span for some root span.
*
* @param name
* The name of the span.
* @param root
* The root span.
* @param f
* The effect to execute in a child span.
* @return
* The contextualized effect.
*/
private def runSpan[A](
name: String,
root: Span[F],
f: F[A]
): F[A] =
root.span(name).use(_ => f)

View file

@ -1 +1 @@
sbt.version=1.10.2 sbt.version=1.11.2

View file

@ -28,6 +28,6 @@ externalResolvers := Seq(
"Garrity Software Releases" at "https://maven.garrity.co/gs" "Garrity Software Releases" at "https://maven.garrity.co/gs"
) )
addSbtPlugin("org.scoverage" % "sbt-scoverage" % "2.1.0") addSbtPlugin("org.scoverage" % "sbt-scoverage" % "2.3.1")
addSbtPlugin("gs" % "sbt-garrity-software" % "0.4.0") addSbtPlugin("gs" % "sbt-garrity-software" % "0.6.0")
addSbtPlugin("gs" % "sbt-gs-semver" % "0.3.0") addSbtPlugin("gs" % "sbt-gs-semver" % "0.3.0")