Giter Site home page Giter Site logo

vitaliihonta / zio-temporal Goto Github PK

View Code? Open in Web Editor NEW
60.0 4.0 9.0 702 KB

Build invincible apps with ZIO and https://temporal.io

Home Page: https://zio-temporal.vhonta.dev

License: Apache License 2.0

Scala 97.79% Makefile 0.40% JavaScript 1.68% CSS 0.13% HTML 0.01%
scala temporal-workflow workflow-management zio functional-programming scala3

zio-temporal's People

Contributors

carlosedp avatar grouzen avatar hoangmaihuy avatar oridag avatar vitaliihonta avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar

zio-temporal's Issues

SLF4J messages when using default zio-logging backend

In my project I'm using the default zio-logging backend and when starting the app I get the following messages:

[47/47] hello.run
SLF4J: No SLF4J providers were found.
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See https://www.slf4j.org/codes.html#noProviders for further details.
timestamp=2023-02-24T18:28:23.035899-03:00 level=INFO thread=zio-fiber-7 message="Started sample-worker"?
timestamp=2023-02-24T18:28:23.078098-03:00 level=INFO thread=zio-fiber-6 message="HTTP Server started at http://localhost:8082"

Since SLF4J is not used, can't these be supressed?

Increase test coverage

Areas to improve:

  • Testing zio-temporal-protobuf module
  • More tests in integration-tests (just examples using zio-temporal functionality)
  • ... Anything else?

Lib cannot load .proto files when using scalapb plugin thru Mill build tool

I use Mill as a build tool and while looking into protobuf usage in zio-temporal, I faced an error that generates encoding errors. Found-out zio-temporal could not load the proto file from my project.

I replicated the build.sbt file from https://github.com/vitaliihonta/func-scala-2022-zio-temporal to a build.sc (mill build file) and saw it doesn't load as seen in the logs below.

This issue is more of a question on how to debug this in the Mill build to fix it's scalapb plugin. Where does this proto loading comes from? The scalapb path?

❯ ./mill -i show cryptostock.run
[1/1] show
[1/1] show > [68/68] cryptostock.run
SLF4J: A number (19) of logging calls during the initialization phase have been intercepted and are
SLF4J: now being replayed. These are subject to the filtering rules of the underlying logging system.
SLF4J: See also http://www.slf4j.org/codes.html#replay
15:39:57.697 [ZScheduler-Worker-6] TRACE z.t.p.i.ProtoFileObjectAutoLoader$ - Provided exclude prefixes: [org/scalatools/,com.google.protobuf,com/google/type/,sbt,com.uber.m3,gogoproto/,com.google.geo,magnolia,org/codehaus/mojo/,com/google/longrunning/,com.cronutils,com.google.thirdparty,xsbt/,com.typesafe.config,distage,com/google/rpc/,logstage/,com/google/common/,akka/,com/thoughtworks/,com.google.logging,org/checkerframework/,zio/temporal/,org.slf4j,com/sun/,io/perfmark/,akka,android/,io.temporal,org.checkerframework,sun,com.google.api,com/google/errorprone/,com/cronutils/,org.scalatools,com/google/geo/,com.google.rpc,com.sun,org.LatencyUtils,izumi,org.reflections,com.google.errorprone,xsbti,xsbti/,sbt/,com/fasterxml/,logstage,pureconfig/,pureconfig,scala/,org/reflections/,com.thoughtworks,com/google/protobuf/,java,com/google/cloud/,com.google.common,com/uber/m3/,jdk/,cats/,izumi/,com.google.longrunning,javax,org/LatencyUtils/,org/slf4j/,org.codehaus.mojo,com.google.gson,org.HdrHistogram,com/google/logging/,com/google/gson/,com/typesafe/config/,io.grpc,zio,xsbt,net/sf/cglib/,io/micrometer/,com.google.type,javax/,com/google/api/,scala,magnolia/,zio/,sun/,io.micrometer,org/HdrHistogram/,com/google/thirdparty/,io.perfmark,mercator,io/temporal/,com.google.cloud,zio.temporal,cats,mercator/,META-INF,org/apache/ivy/,java/,org.apache.ivy,distage/,io/grpc/,com.fasterxml,jdk,module-info.class,META-INF/,gogoproto,android,net.sf.cglib]
15:39:57.719 [ZScheduler-Worker-6] INFO  org.reflections.Reflections - Reflections took 8 ms to scan 0 urls, producing 0 keys and 0 values
15:39:57.728 [ZScheduler-Worker-6] TRACE z.t.p.i.ProtoFileObjectAutoLoader$ - Found subtypes of GeneratedFileObject: []
15:39:57.729 [ZScheduler-Worker-6] INFO  z.t.p.i.ProtoFileObjectAutoLoader$ - Loaded 0 GeneratedFileObject(s): []
15:39:57.916 [zio-default-blocking-1] INFO  i.t.s.WorkflowServiceStubsImpl - Created WorkflowServiceStubs for channel: ManagedChannelOrphanWrapper{delegate=ManagedChannelImpl{logId=1, target=127.0.0.1:7233}}
timestamp=2023-04-17T18:39:58.476190Z level=INFO thread=#zio-fiber-6 message="Going to create an exchange order" location=com.cryptostock.Main$.run.exampleFlow.macro file=Main.scala line=19
15:39:58.486 [zio-default-blocking-2] INFO  zio-slf4j-logger - timestamp=2023-04-17T15:39:58.483441-03:00 level=INFO thread=zio-fiber-6 message="Going to create an exchange order"
timestamp=2023-04-17T18:39:58.510999Z level=INFO thread=#zio-fiber-6 message="Going to trigger workflow" location=com.cryptostock.clients.ExchangeClientService.exchangeOrder.macro file=ExchangeClientService.scala line=39
15:39:58.511 [zio-default-blocking-2] INFO  zio-slf4j-logger - timestamp=2023-04-17T15:39:58.511369-03:00 level=INFO thread=zio-fiber-6 message="Going to trigger workflow"
timestamp=2023-04-17T18:39:58.600419Z level=INFO thread=#zio-fiber-6 message="Order placed" location=com.cryptostock.Main$.run.exampleFlow.macro file=Main.scala line=24
15:39:58.601 [zio-default-blocking-2] INFO  zio-slf4j-logger - timestamp=2023-04-17T15:39:58.600896-03:00 level=INFO thread=zio-fiber-6 message="Order placed"
15:39:58.726 [workflow-method-10b07a55-347f-4119-9b59-b1e4edffcc14-53a41b05-e116-4b6e-9af1-d92f2cf6c16c] ERROR i.t.i.sync.WorkflowExecuteRunnable - Workflow execution failure WorkflowId=10b07a55-347f-4119-9b59-b1e4edffcc14, RunId=53a41b05-e116-4b6e-9af1-d92f2cf6c16c, WorkflowType=ExchangeWorkflow
io.temporal.common.converter.DataConverterException: Unable to convert metadata {
  key: "encoding"
  value: "binary/protobuf"
}
metadata {
  key: "messageType"
  value: "com.cryptostock.exchange.ExchangeOrderRequest"
}
data: "\n\022\t\344N\026\214\235!\315\356\021\307\220#\267;$\027\252\022\b\b\001\022\004\n\002\001\244\030\000"
 to com.cryptostock.exchange.ExchangeOrderRequest when parsing:"
	�N��!��ǐ#�;$�
�" into following types: [class com.cryptostock.exchange.ExchangeOrderRequest]
	at io.temporal.common.converter.DefaultDataConverter.fromPayload(DefaultDataConverter.java:140)
	at io.temporal.common.converter.DataConverter.arrayFromPayloads(DataConverter.java:105)
	at io.temporal.internal.sync.POJOWorkflowImplementationFactory$POJOWorkflowImplementation.execute(POJOWorkflowImplementationFactory.java:278)
	at io.temporal.internal.sync.WorkflowExecuteRunnable.run(WorkflowExecuteRunnable.java:68)
	at io.temporal.internal.sync.SyncWorkflow.lambda$start$0(SyncWorkflow.java:135)
	at io.temporal.internal.sync.CancellationScopeImpl.run(CancellationScopeImpl.java:102)
	at io.temporal.internal.sync.WorkflowThreadImpl$RunnableWrapper.run(WorkflowThreadImpl.java:107)
	at io.temporal.worker.WorkerFactory.lambda$newWorkflowThreadExecutor$7(WorkerFactory.java:392)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: zio.temporal.protobuf.ProtobufPayloadException: Unable to convert metadata {
  key: "encoding"
  value: "binary/protobuf"
}
metadata {
  key: "messageType"
  value: "com.cryptostock.exchange.ExchangeOrderRequest"
}
data: "\n\022\t\344N\026\214\235!\315\356\021\307\220#\267;$\027\252\022\b\b\001\022\004\n\002\001\244\030\000"
 to com.cryptostock.exchange.ExchangeOrderRequest
	at zio.temporal.protobuf.ScalapbPayloadConverter.getCompanion$$anonfun$1(ScalapbPayloadConverter.scala:158)
	at scala.collection.immutable.HashMap.getOrElse(HashMap.scala:678)
	at zio.temporal.protobuf.ScalapbPayloadConverter.getCompanion(ScalapbPayloadConverter.scala:158)
	at zio.temporal.protobuf.ScalapbPayloadConverter.fromData(ScalapbPayloadConverter.scala:113)
	at io.temporal.common.converter.DefaultDataConverter.fromPayload(DefaultDataConverter.java:136)
	... 12 common frames omitted
15:39:58.774 [workflow-method-10b07a55-347f-4119-9b59-b1e4edffcc14-53a41b05-e116-4b6e-9af1-d92f2cf6c16c] ERROR i.t.i.sync.WorkflowExecuteRunnable - Workflow execution failure WorkflowId=10b07a55-347f-4119-9b59-b1e4edffcc14, RunId=53a41b05-e116-4b6e-9af1-d92f2cf6c16c, WorkflowType=ExchangeWorkflow
io.temporal.common.converter.DataConverterException: Unable to convert metadata {
  key: "encoding"
  value: "binary/protobuf"
}
metadata {
  key: "messageType"
  value: "com.cryptostock.exchange.ExchangeOrderRequest"
}
data: "\n\022\t\344N\026\214\235!\315\356\021\307\220#\267;$\027\252\022\b\b\001\022\004\n\002\001\244\030\000"
 to com.cryptostock.exchange.ExchangeOrderRequest when parsing:"
	�N��!��ǐ#�;$�
�" into following types: [class com.cryptostock.exchange.ExchangeOrderRequest]
	at io.temporal.common.converter.DefaultDataConverter.fromPayload(DefaultDataConverter.java:140)
	at io.temporal.common.converter.DataConverter.arrayFromPayloads(DataConverter.java:105)
	at io.temporal.internal.sync.POJOWorkflowImplementationFactory$POJOWorkflowImplementation.execute(POJOWorkflowImplementationFactory.java:278)
	at io.temporal.internal.sync.WorkflowExecuteRunnable.run(WorkflowExecuteRunnable.java:68)
	at io.temporal.internal.sync.SyncWorkflow.lambda$start$0(SyncWorkflow.java:135)
	at io.temporal.internal.sync.CancellationScopeImpl.run(CancellationScopeImpl.java:102)
	at io.temporal.internal.sync.WorkflowThreadImpl$RunnableWrapper.run(WorkflowThreadImpl.java:107)
	at io.temporal.worker.WorkerFactory.lambda$newWorkflowThreadExecutor$7(WorkerFactory.java:392)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: zio.temporal.protobuf.ProtobufPayloadException: Unable to convert metadata {
  key: "encoding"
  value: "binary/protobuf"
}
metadata {
  key: "messageType"
  value: "com.cryptostock.exchange.ExchangeOrderRequest"
}
data: "\n\022\t\344N\026\214\235!\315\356\021\307\220#\267;$\027\252\022\b\b\001\022\004\n\002\001\244\030\000"
 to com.cryptostock.exchange.ExchangeOrderRequest
	at zio.temporal.protobuf.ScalapbPayloadConverter.getCompanion$$anonfun$1(ScalapbPayloadConverter.scala:158)
	at scala.collection.immutable.HashMap.getOrElse(HashMap.scala:678)
	at zio.temporal.protobuf.ScalapbPayloadConverter.getCompanion(ScalapbPayloadConverter.scala:158)
	at zio.temporal.protobuf.ScalapbPayloadConverter.fromData(ScalapbPayloadConverter.scala:113)
	at io.temporal.common.converter.DefaultDataConverter.fromPayload(DefaultDataConverter.java:136)
	... 12 common frames omitted

The build.sc file is below:

import mill._, mill.scalalib._, mill.scalalib.scalafmt._
import $ivy.`com.lihaoyi::mill-contrib-scalapblib:`, contrib.scalapblib._

object versions {
  val scala3      = "3.1.2"
  val zio         = "2.0.0"
  val ziologging  = "2.0.0"
  val ziotemporal = "0.1.0-RC5"
}

object cryptostock extends SbtModule with ScalafmtModule with ScalaPBModule {
  def scalaVersion = versions.scala3
  def ivyDeps = Agg(
    ivy"dev.zio::zio:${versions.zio}",
    ivy"dev.zio::zio-logging:${versions.ziologging}",
    ivy"dev.zio::zio-logging-slf4j:${versions.ziologging}",
    ivy"dev.vhonta::zio-temporal-core:${versions.ziotemporal}",
    ivy"dev.vhonta::zio-temporal-protobuf:${versions.ziotemporal}",
    ivy"com.thesamet.scalapb::scalapb-runtime:0.11.11",
    ivy"ch.qos.logback:logback-classic:1.2.11"
  )
  object test extends Tests {
    def testFramework = T("zio.test.sbt.ZTestFramework")
    def ivyDeps = Agg(
      ivy"dev.zio::zio-test:${versions.zio}",
      ivy"dev.zio::zio-test-sbt:${versions.zio}",
      ivy"dev.vhonta::zio-temporal-testkit:${versions.ziotemporal}"
    )
  }
  def scalaPBVersion = "0.11.11"
  def scalaPBSources = T.sources {
    millSourcePath / "src" / "main" / "protobuf"
  }
  def scalaPBIncludePath = T.sources(Seq(scalaPBUnpackProto()))
  def scalaPBFlatPackage = true
  def scalaPBGrpc        = false
}

Enhancements: NewsSync-related sample

Protobuf:

  • Support Option proto type
  • Add LocalDate proto type (as unix time)
  • Support LocalTime/ZoneId proto type

Workflow:

  • ZWorkflow.getLogger without getClass
  • ZWorkflowState type-specific syntax (for instance, +=, -= for numbers, something for Map)

ZAsync:

  • ZAsync.foreachPar/Discard
  • ZAsync.collectAll
  • ZAsync.ignore
  • ZAsync.unit
  • ZAsync.option
  • ZAsync.tap/Error

Misc:

  • Improve StubProxy error message (and add a note to FAQ)
  • Add ZWorkflow.listExecutions
  • More friendly config names (ZIO_TEMPORAL_ZWORKFLOWSERVICESTUBS_SERVERURL => ZIO_TEMPORAL_ZWORKFLOW_SERVICE_STUBS_SERVER_URL, etc.)

Having an activity with ZIO effect breaks type safety

When creating activities with ZIO effects that might fail, the return value of ZActivity.run is Either[Failure,Success].

@activityInterface
trait EchoActivity:
  @activityMethod
  def echo(msg: String): Either[Exception, String]

// Activity Implementation
class EchoActivityImpl(
  implicit options: ZActivityOptions[Any],
) extends EchoActivity:
  override def echo(msg: String): Either[Exception, String] =
    ZActivity.run:
      for
        _ <- ZIO.logDebug(s"Received message: $msg")
        newMsg <- eventuallyFail(s"ACK: $msg", 30)
      yield newMsg
      // This returns a ZIO[Any, Exception, String] which is mapped to an Either in `.run`

Like above, having the return values in my activity signature (which is required) and implementation as Either[Failure,Success], throw a Temporal serialization error:

timestamp=2023-03-21T09:24:04.549352-03:00 level=ERROR thread=zio-fiber-0 message="" cause=Exception in thread "zio-fiber-6" zio.temporal.TemporalClientError: TemporalClientError(io.temporal.client.WorkflowFailedException: Workflow execution {workflowId='echo-b3794948-b558-48ab-a76d-8dacadabe3b4', runId='6ce905f5-a64c-415b-bedf-7ed575bf625c', workflowType='EchoWorkflow'} failed. Metadata: {closeEventType='EVENT_TYPE_WORKFLOW_EXECUTION_FAILED', retryState='RETRY_STATE_RETRY_POLICY_NOT_SET', workflowTaskCompletedEventId=10'} cause=io.temporal.failure.ApplicationFailure: message='class java.util.LinkedHashMap cannot be cast to class scala.util.Either (java.util.LinkedHashMap is in module java.base of loader 'bootstrap'; scala.util.Either is in unnamed module of loader 'app')', type='java.lang.ClassCastException', nonRetryable=false)
	at zio.temporal.internal.TemporalInteraction$.fromFuture.macro(TemporalInteraction.scala:27)
	at zio.temporal.internal.TemporalInteraction$.fromFuture.macro(TemporalInteraction.scala:28)
	at <empty>.Client.workflowResultZIO(zio-temporal-activity-retry.scala:125)
	at zio.temporal.worker.ZWorkerFactory.use.macro(ZWorkerFactory.scala:27)
	at zio.temporal.worker.ZWorkerFactory.use.macro(ZWorkerFactory.scala:28)
	at <empty>.Main.run.program(zio-temporal-activity-retry.scala:142)
	at <empty>.Main.run(zio-temporal-activity-retry.scala:144)

The workaround is defining the signature as Any like:

@activityInterface
trait EchoActivity:
  @activityMethod
  def echo(msg: String): Any

// Activity Implementation
class EchoActivityImpl(
  implicit options: ZActivityOptions[Any],
) extends EchoActivity:
  override def echo(msg: String): Any =
    ZActivity.run:
      for
        _ <- ZIO.logDebug(s"Received message: $msg")
        newMsg <- eventuallyFail(s"ACK: $msg", 30)
      yield newMsg

Which kinda works but the return value from Temporal is still an Either wrapped in json as seen in the logs below requiring something like a cast to extract the correct value.

❯ scli zio-temporal-activity-retry.scala
SLF4J: No SLF4J providers were found.
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See https://www.slf4j.org/codes.html#noProviders for further details.
timestamp=2023-03-21T09:22:42.818928-03:00 level=INFO thread=zio-fiber-7 message="Started sample-worker"
timestamp=2023-03-21T09:22:43.165645-03:00 level=INFO thread=zio-fiber-6 message="Will submit message "testMsg""
timestamp=2023-03-21T09:22:43.427282-03:00 level=DEBUG thread=zio-fiber-27 message="Received message: testMsg"
timestamp=2023-03-21T09:22:43.523249-03:00 level=INFO thread=zio-fiber-6 message="The workflow result: {value=ACK: testMsg, left=false, right=true}"

The full code is in https://gist.github.com/carlosedp/61703d5c436dc50ec5876249fbbbed31

Improve compile-time error messages

  1. When using ZChildWorkflowStub (instead of a concrete ZChildWorkflowStub.Of[A] inside ZChildWorkflowStub.execute (and etc.), the error message is obscure:
Exception occurred while executing macro expansion.
java.util.NoSuchElementException: head of empty list
    at scala.collection.immutable.Nil$.head(List.scala:662)
    at scala.collection.immutable.Nil$.head(List.scala:661)
    at zio.temporal.internal.InvocationMacroUtils$MethodInvocation.selectJavaReprOf(InvocationMacroUtils.scala:63)
    at zio.temporal.workflow.ZChildWorkflowExecutionSyntax$.executeImpl(ZChildWorkflowExecutionSyntax.scala:26)

This can definitely be improved

Proxied methods of interface Activity should not be invoked at runtime! But invoked public abstract void Activity.act()

Reproducer:

import zio.*
import zio.temporal.*
import zio.temporal.testkit.*
import zio.temporal.workflow.*

object Repro extends ZIOApp {

  override def run: ZIO[Environment, Any, Unit] = e

  val environmentTag: EnvironmentTag[Environment] = EnvironmentTag[Environment]

  val e = for {
    testEnv <- ZIO.service[ZTestWorkflowEnvironment[Any]]
    worker <- testEnv.newWorker("q")
    _ <- worker.addWorkflow[WFImpl].fromClass
    _ <- worker.addActivityImplementation(ActivityImpl)
    _ <- testEnv.start
    wfStub <- testEnv.workflowClient.newWorkflowStub[WF]
      .withTaskQueue("q")
      .withWorkflowId("wid")
      .withWorkflowRunTimeout(1.minute)
      .build
    _ <- ZWorkflowStub.execute(wfStub.run())


  } yield ()


  override type Environment = ZTestWorkflowEnvironment[Any]


  override def bootstrap: ZLayer[ZIOAppArgs, Any, Environment] = ZLayer.make[ZTestWorkflowEnvironment[Any]](
    ZTestWorkflowEnvironment.make[Any],
    ZTestEnvironmentOptions.default
  )


}

@activityInterface
trait Activity {
  @activityMethod
  def act(): Unit
}

object ActivityImpl extends Activity {
  override def act(): Unit = println("act")
}

@workflowInterface
trait WF {
  @workflowMethod
  def run(): Unit
}

class WFImpl extends WF {
  override def run(): Unit =
    ZWorkflow.newLocalActivityStub[Activity]
      .withStartToCloseTimeout(10.seconds)
      .build.act()
}

Apps generate exception when running from assembly JAR

I'm trying to generate assembly JARs for some applications(to run in a Docker container for example) and when run they generate exceptions.

Sample:
https://gist.github.com/carlosedp/8af4ec212bd987335c47933bee4c8db0

❯ scala-cli package -f zio-temporal.scala -o ziotemporal.jar --assembly

Wrote /Users/cdepaula/repos/scala-playground/zio/ziotemporal.jar, run it with
  ./ziotemporal.jar

❯ ll ziotemporal.jar
.rwxr-xr-x 57Mi cdepaula 13 Mar 09:28 ziotemporal.jar*

Error:

https://pastebin.com/TTEEJq49

On another application from https://github.com/carlosedp/zio-temporal-hello, I generate the Assembly and it returns:

https://pastebin.com/qdiKhkaB

Both run fine when I use the build tool run, like scala-cli zio-temporal.scala for the GIST and ./mill run worker.run.

Async.function does not like Scala lambdas

I got bit by a problem with the reflection logic that attempts to determine whether Async.function is being passed a method reference in MethodReferenceDisassembler#isAsyncJava. Turns out that a method reference in Scala has a different result from getImplMethodKind. It returns MethodHandleInfo.REF_invokeStatic instead of MethodHandleInfo.REF_invokeInterface. We had to invoke this through a small java shim to get it to function correctly and not execute directly on the local workflow executor.

I think ztemporal may have the same problem. It also appears to me to call Async.function passing a Scala lambda expression.
temporalio/sdk-java#1007 (comment)

I have not tested this myself yet, but it certainly looks like this is a problem. I'd like to start using zio-temporal soon, so hopefully I can verify and maybe find a solution.

Allow passing typed search attributes to child workflow

With the Java SDK one can do:

  val sa = Workflow.getTypedSearchAttributes
  val optionsBuilder = ChildWorkflowOptions.newBuilder().setTypedSearchAttributes(sa)

With the current zio-temporal API, this is not possible, or at least I couldn't find a way to do that

ZWorkflowOptions errors if options are not in correct order

While migrating from 0.5 to 0.6, I gound that ZWorkflowOptions is positional.

If I setup as:

client.newWorkflowStub[EchoWorkflow](
                    ZWorkflowOptions
                        .withTaskQueue(TemporalQueues.echoQueue)
                        .withWorkflowId(workflowID)
                        .withWorkflowExecutionTimeout(60.seconds)
                        .withWorkflowRunTimeout(10.seconds)
                        .withRetryOptions(
                            ZRetryOptions.default
                                .withMaximumAttempts(3)
                                .withInitialInterval(300.millis)
                                .withBackoffCoefficient(1)
                        )
                )

I get the error:

[error] -- [E008] Not Found Error: /Users/cdepaula/repos/scala/zio-temporal-hello/webclient/src/WebClient.scala:26:25
[error] 25 |                    ZWorkflowOptions
[error] 26 |                        .withTaskQueue(TemporalQueues.echoQueue)
[error]    |                    ^
[error]    |value withTaskQueue is not a member of object zio.temporal.workflow.ZWorkflowOptions
[error] one error found
1 targets failed

But shuffling up the withWorkflowId, it works:

client.newWorkflowStub[EchoWorkflow](
                    ZWorkflowOptions
                        .withWorkflowId(workflowID)
                        .withTaskQueue(TemporalQueues.echoQueue)
                        .withWorkflowExecutionTimeout(60.seconds)
                        .withWorkflowRunTimeout(10.seconds)
                        .withRetryOptions(
                            ZRetryOptions.default
                                .withMaximumAttempts(3)
                                .withInitialInterval(300.millis)
                                .withBackoffCoefficient(1)
                        )
                )

I think the order shouldn't matter.

Migrate to zio.ZLayerAspect

We don't need a custom ZLayerAspect implementation as ZIO recently introduced its own.
We need to remove our own and migrate to the ZIO's ZLayer aspects once zio/zio#8391 is released

Temporal Tutorial

It would be great to have simple starters examples (same as in zio-temporal-samples repo)
As mentioned by @carlosedp :

  • Simple Temporal worker/client app with one workflow
  • Separate Temporal apps for worker/client and workflow + activity
  • Separate Temporal apps for worker/client and workflow + activity with retry
  • Temporal workflow/activity test

It could be a tutorial section on the website

Get rid of noisy reflections warning

Running temporal workflows via sbt/mill in combination with zio-temporal-protobuf produces a noisy warning (which doesn't affect the code BTW).

❯ ./mill -i show cryptostock.run
[1/1] show > [67/67] cryptostock.run
18:42:58.992 [ZScheduler-Worker-3] TRACE z.t.p.i.ProtoFileObjectAutoLoader$ - Provided exclude prefixes: [org/scalatools/,com.google.protobuf,com/google/type/,sbt,com.uber.m3,gogoproto/,com.google.geo,magnolia,org/codehaus/mojo/,com/google/longrunning/,com.cronutils,com.google.thirdparty,xsbt/,com.typesafe.config,distage,com/google/rpc/,logstage/,com/google/common/,akka/,com/thoughtworks/,com.google.logging,org/checkerframework/,zio/temporal/,org.slf4j,com/sun/,io/perfmark/,akka,android/,io.temporal,org.checkerframework,sun,com.google.api,com/google/errorprone/,com/cronutils/,org.scalatools,com/google/geo/,com.google.rpc,com.sun,org.LatencyUtils,izumi,org.reflections,com.google.errorprone,xsbti,xsbti/,sbt/,com/fasterxml/,logstage,pureconfig/,pureconfig,scala/,org/reflections/,com.thoughtworks,com/google/protobuf/,java,com/google/cloud/,com.google.common,com/uber/m3/,jdk/,cats/,izumi/,com.google.longrunning,javax,org/LatencyUtils/,org/slf4j/,org.codehaus.mojo,com.google.gson,org.HdrHistogram,com/google/logging/,com/google/gson/,com/typesafe/config/,io.grpc,zio,xsbt,net/sf/cglib/,io/micrometer/,com.google.type,javax/,com/google/api/,scala,magnolia/,zio/,sun/,io.micrometer,org/HdrHistogram/,com/google/thirdparty/,io.perfmark,mercator,io/temporal/,com.google.cloud,zio.temporal,cats,mercator/,META-INF,org/apache/ivy/,java/,org.apache.ivy,distage/,io/grpc/,com.fasterxml,jdk,module-info.class,META-INF/,gogoproto,android,net.sf.cglib]
18:42:59.228 [ForkJoinPool.commonPool-worker-23] WARN  org.reflections.Reflections - could not create Vfs.Dir from url. ignoring the exception and continuing
org.reflections.ReflectionsException: could not create Vfs.Dir from url, no matching UrlType was found [file:/home/carlosedp/repos/func-scala-2022-zio-temporal/cryptostock/compile-resources]
either use fromURL(final URL url, final List<UrlType> urlTypes) or use the static setDefaultURLTypes(final List<UrlType> urlTypes) or addDefaultURLTypes(UrlType urlType) with your specialized UrlType.
        at org.reflections.vfs.Vfs.fromURL(Vfs.java:113)
        at org.reflections.vfs.Vfs.fromURL(Vfs.java:95)
        at org.reflections.Reflections.lambda$scan$2(Reflections.java:176)
        at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
        at java.base/java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1707)
        at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
        at java.base/java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:290)
        at java.base/java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:754)
        at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
        at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
        at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
        at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
        at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)```

Cannot build project using zio-temporal

Hello! I was really excited to try zio-temporal for a project but from one day to the next I was unable to compile my project. Now I am unable to compile any project, on any computer I have tried (one MacBook and one Thinkpad running Ubuntu), where I include zio-temporal. I did eventually find a workaround though.

Minimum Example

  1. Create a new project using sbt new scala/hello-world.g8
  2. Use following settings in build.sbt:
lazy val temporalCore = "dev.vhonta" %% "zio-temporal-core" % "0.1.0-RC6"

libraryDependencies ++= Seq(
  "org.scala-lang.modules" %% "scala-parser-combinators" % "2.1.1",
  temporalCore
)
  1. Run sbt compile in repo.

Expected Behaviour
My project compiles as it should

Actual Behaviour
I get the following error:

[warn]  Note: Unresolved dependencies path:
[error] sbt.librarymanagement.ResolveException: Error downloading $com.google.protobuf:protobuf-java:
[error]   Not found
[error]   Not found
[error]   not found: /home/ragge/.ivy2/local$com.google.protobuf/protobuf-java/ivys/ivy.xml
[error]   not found: https://repo1.maven.org/maven2/$com/google/protobuf/protobuf-java//protobuf-java-.pom
[error]         at lmcoursier.CoursierDependencyResolution.unresolvedWarningOrThrow(CoursierDependencyResolution.scala:344)
[error]         at lmcoursier.CoursierDependencyResolution.$anonfun$update$38(CoursierDependencyResolution.scala:313)
[error]         at scala.util.Either$LeftProjection.map(Either.scala:573)
[error]         at lmcoursier.CoursierDependencyResolution.update(CoursierDependencyResolution.scala:313)
[error]         at sbt.librarymanagement.DependencyResolution.update(DependencyResolution.scala:60)
[error]         at sbt.internal.LibraryManagement$.resolve$1(LibraryManagement.scala:59)
[error]         at sbt.internal.LibraryManagement$.$anonfun$cachedUpdate$12(LibraryManagement.scala:133)
[error]         at sbt.util.Tracked$.$anonfun$lastOutput$1(Tracked.scala:73)
[error]         at sbt.internal.LibraryManagement$.$anonfun$cachedUpdate$20(LibraryManagement.scala:146)
[error]         at scala.util.control.Exception$Catch.apply(Exception.scala:228)
[error]         at sbt.internal.LibraryManagement$.$anonfun$cachedUpdate$11(LibraryManagement.scala:146)
[error]         at sbt.internal.LibraryManagement$.$anonfun$cachedUpdate$11$adapted(LibraryManagement.scala:127)
[error]         at sbt.util.Tracked$.$anonfun$inputChangedW$1(Tracked.scala:219)
[error]         at sbt.internal.LibraryManagement$.cachedUpdate(LibraryManagement.scala:160)
[error]         at sbt.Classpaths$.$anonfun$updateTask0$1(Defaults.scala:3687)
[error]         at scala.Function1.$anonfun$compose$1(Function1.scala:49)
[error]         at sbt.internal.util.$tilde$greater.$anonfun$$u2219$1(TypeFunctions.scala:62)
[error]         at sbt.std.Transform$$anon$4.work(Transform.scala:68)
[error]         at sbt.Execute.$anonfun$submit$2(Execute.scala:282)
[error]         at sbt.internal.util.ErrorHandling$.wideConvert(ErrorHandling.scala:23)
[error]         at sbt.Execute.work(Execute.scala:291)
[error]         at sbt.Execute.$anonfun$submit$1(Execute.scala:282)
[error]         at sbt.ConcurrentRestrictions$$anon$4.$anonfun$submitValid$1(ConcurrentRestrictions.scala:265)
[error]         at sbt.CompletionService$$anon$2.call(CompletionService.scala:64)
[error]         at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
[error]         at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:577)
[error]         at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
[error]         at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
[error]         at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
[error]         at java.base/java.lang.Thread.run(Thread.java:1589)
[error] (update) sbt.librarymanagement.ResolveException: Error downloading $com.google.protobuf:protobuf-java:

Workaround
The problem seems to be (to me) that when sbt tries to resolve the transitive dependency com.google.protobuf:protobuf-java it somehow adds $com.google.protobuf:protobuf-java as a dependency. I managed to get my projects compiling by simply adding the following to the example above:

lazy val temporalCore = "dev.vhonta" %% "zio-temporal-core" % "0.1.0-RC6"
libraryDependencies ++= Seq(
  "org.scala-lang.modules" %% "scala-parser-combinators" % "2.1.1",
  temporalCore
)
excludeDependencies += ExclusionRule("$com.google.protobuf","protobuf-java")//<-- What I added

I know too little about sbt, but only adding a dependency on protobuf-java does not cause any issue, so my guess would be that there is some bug in the zio-temporal lib.

Would be thankful for any help on this.

Using `TestAspect.silentLogging` doesn't silent ZWorkerFactory shutdown message

In a test spec, I do some operations and they print the logs. I added TestAspect.silentLogging but the shutdown message is still printed.

Without the TestAspect.silentLogging aspect:

❯ ./mill e2e.test
[227/227] e2e.test.test
+ E2E
timestamp=2023-09-21T23:35:05.038217Z level=INFO thread=#zio-fiber-74 message="Started sample-worker listening to queue echo-queue" location=<empty>.Worker.worker file=Worker.scala line=7
timestamp=2023-09-21T23:35:05.405171Z level=INFO thread=#zio-fiber-74 message="ZWorkerFactory started" location=zio.temporal.worker.ZWorkerFactory.start file=ZWorkerFactory.scala line=32
timestamp=2023-09-21T23:35:05.440742Z level=INFO thread=#zio-fiber-74 message="Will submit message "testmsg123" with workflowID client-BKFH-02740-EQEY-49442" location=<empty>.Client.invokeWorkflow file=Client.scala line=17
timestamp=2023-09-21T23:35:05.810349Z level=INFO thread=#zio-fiber-96 message="Worker: Success processing message" location=<empty>.EchoActivityImpl.eventuallyFail file=EchoActivity.scala line=50
timestamp=2023-09-21T23:35:05.957688Z level=INFO thread=#zio-fiber-23 message="ZWorkerFactory shutdownNow initiated..." location=zio.temporal.worker.ZWorkerFactory.shutdownNow file=ZWorkerFactory.scala line=52
  + Start worker and send msg via client
1 tests passed. 0 tests failed. 0 tests ignored.

Executed in 1 s 820 ms

With @@ TestAspect.silentLogging added:

❯ ./mill e2e.test
[220/227] e2e.test.compile
[info] compiling 1 Scala source to /Users/cdepaula/repos/scala/zio-temporal-hello/out/e2e/test/compile.dest/classes ...
[info] done compiling
[227/227] e2e.test.test
+ E2E
timestamp=2023-09-21T23:35:28.806628Z level=INFO thread=#zio-fiber-23 message="ZWorkerFactory shutdownNow initiated..." location=zio.temporal.worker.ZWorkerFactory.shutdownNow file=ZWorkerFactory.scala line=52
  + Start worker and send msg via client
1 tests passed. 0 tests failed. 0 tests ignored.

Executed in 1 s 840 ms

The code is from https://github.com/carlosedp/zio-temporal-hello/blob/main/e2e/test/src/E2ESpec.scala

ZActivity.run doesn't work with local activities

When running an activity as LocalActivity with ZActivity run, it throws an exception:

Caused by: io.temporal.failure.ApplicationFailure: message='getTaskToken is not supported for local activities', type='java.lang.UnsupportedOperationException', nonRetryable=false

Need to handle this inside ZActivity.run when trying to get the task token for async completion

Replace <Thing>StubBuilder with <Thing>Options

In zio-temporal, WorkflowStubs and ActivityStubs are built using a custom builder, while Java SDK provides various WorkflowOptions, ActivityOptions, etc.
Because of this implementation details, various Java SDK functionality cannot be supported, including:

  • Workflow.setDefaultActivityOptions
  • Workflow.applyActivityOptions
  • Workflow.setDefaultLocalActivityOptions
  • Workflow.applyLocalActivityOptions
  • WorkflowImplementationOptions.setDefaultActivityOptions

We need to replace builders with corresponding wrappers around Java SDK's options. The wrappers should be typesafe enough

Base website documentation

Add the following pages

  • Child workflows
  • External workflows
  • Continue as new
  • Distributed sagas
  • Protobuf section
  • Testing section

Backport more Java SDK functionality

Need to backport more Java SDK functionality by providing convenient ZIO wrappers for them.
It includes simple methods, while complicated functionality is decoupled into dedicated tickets.

An incomplete list of what to add:

  • Workflows
  • Workflow.newQueue (and a wrapper for io.temporal.workflow.WorkflowQueue)
  • Workflow.getMemo
  • Workflow.retry
  • Workflow.registerListener (and dynamic workflows in general)
  • Check polymorphic activities example
  • ... to be completed

Cannot deserialize Either's with JacksonPayloadDataConverter

Problem:
Minimal reproducible example here https://github.com/alterationx10/temporal-either-repro/blob/main/src/test/scala/TemporalEitherSpec.scala.

The problem is that jackson-module-scala needs a type hint for Left/Right value. It worked somehow before untyped-stub rewrite.

TODO:

  • Try providing a type hint for jackson in TemporalWorkflowFacade
  • Examine ZWorkflowStub#result-like methods, so that they also use a type hint

NOTE: izumi reflect may help to do it (it's already available because of zio)

Revisit error model

Either's are not retried by temporal. Only thrown exceptions are.
The only way ZIO errors can be retried is using ZIO.die, which looks strange

Here is a list of examples to verify the new zio-temporal error model on:

Have separate build files for each example project

It would be great to have a build.sbt file for each example instead of using the main one. This way, people starting with zio-temporal have a clear base on what to use on fresh project.

I can submit this once #36 is merged. Also if desired I can also add the build.sc for Mill (which is the build tool I use as default).

Add TestEnvironment.sleep support

Currently, the only way to do so is using Java SDK directly:

_     <- ZIO.serviceWith[ZTestWorkflowEnvironment[Any]] { zTestEnv =>
                   zTestEnv.toJava.sleep(java.time.Duration.ofDays(5))
                 }

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.