vitaliihonta / zio-temporal Goto Github PK
View Code? Open in Web Editor NEWBuild invincible apps with ZIO and https://temporal.io
Home Page: https://zio-temporal.vhonta.dev
License: Apache License 2.0
Build invincible apps with ZIO and https://temporal.io
Home Page: https://zio-temporal.vhonta.dev
License: Apache License 2.0
It would be nice to have zio-temporal added to https://github.com/zio/zio/tree/series/2.x/docs/ecosystem/community.
This includes:
I'm not sure if the code on the home page attracts people. Make it simpler (take inspiration from temporal's website?)
In my project I'm using the default zio-logging backend and when starting the app I get the following messages:
[47/47] hello.run
SLF4J: No SLF4J providers were found.
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See https://www.slf4j.org/codes.html#noProviders for further details.
timestamp=2023-02-24T18:28:23.035899-03:00 level=INFO thread=zio-fiber-7 message="Started sample-worker"?
timestamp=2023-02-24T18:28:23.078098-03:00 level=INFO thread=zio-fiber-6 message="HTTP Server started at http://localhost:8082"
Since SLF4J is not used, can't these be supressed?
Areas to improve:
zio-temporal-protobuf
moduleintegration-tests
(just examples using zio-temporal
functionality)I use Mill as a build tool and while looking into protobuf usage in zio-temporal, I faced an error that generates encoding errors. Found-out zio-temporal could not load the proto file from my project.
I replicated the build.sbt file from https://github.com/vitaliihonta/func-scala-2022-zio-temporal to a build.sc (mill build file) and saw it doesn't load as seen in the logs below.
This issue is more of a question on how to debug this in the Mill build to fix it's scalapb plugin. Where does this proto loading comes from? The scalapb path?
❯ ./mill -i show cryptostock.run
[1/1] show
[1/1] show > [68/68] cryptostock.run
SLF4J: A number (19) of logging calls during the initialization phase have been intercepted and are
SLF4J: now being replayed. These are subject to the filtering rules of the underlying logging system.
SLF4J: See also http://www.slf4j.org/codes.html#replay
15:39:57.697 [ZScheduler-Worker-6] TRACE z.t.p.i.ProtoFileObjectAutoLoader$ - Provided exclude prefixes: [org/scalatools/,com.google.protobuf,com/google/type/,sbt,com.uber.m3,gogoproto/,com.google.geo,magnolia,org/codehaus/mojo/,com/google/longrunning/,com.cronutils,com.google.thirdparty,xsbt/,com.typesafe.config,distage,com/google/rpc/,logstage/,com/google/common/,akka/,com/thoughtworks/,com.google.logging,org/checkerframework/,zio/temporal/,org.slf4j,com/sun/,io/perfmark/,akka,android/,io.temporal,org.checkerframework,sun,com.google.api,com/google/errorprone/,com/cronutils/,org.scalatools,com/google/geo/,com.google.rpc,com.sun,org.LatencyUtils,izumi,org.reflections,com.google.errorprone,xsbti,xsbti/,sbt/,com/fasterxml/,logstage,pureconfig/,pureconfig,scala/,org/reflections/,com.thoughtworks,com/google/protobuf/,java,com/google/cloud/,com.google.common,com/uber/m3/,jdk/,cats/,izumi/,com.google.longrunning,javax,org/LatencyUtils/,org/slf4j/,org.codehaus.mojo,com.google.gson,org.HdrHistogram,com/google/logging/,com/google/gson/,com/typesafe/config/,io.grpc,zio,xsbt,net/sf/cglib/,io/micrometer/,com.google.type,javax/,com/google/api/,scala,magnolia/,zio/,sun/,io.micrometer,org/HdrHistogram/,com/google/thirdparty/,io.perfmark,mercator,io/temporal/,com.google.cloud,zio.temporal,cats,mercator/,META-INF,org/apache/ivy/,java/,org.apache.ivy,distage/,io/grpc/,com.fasterxml,jdk,module-info.class,META-INF/,gogoproto,android,net.sf.cglib]
15:39:57.719 [ZScheduler-Worker-6] INFO org.reflections.Reflections - Reflections took 8 ms to scan 0 urls, producing 0 keys and 0 values
15:39:57.728 [ZScheduler-Worker-6] TRACE z.t.p.i.ProtoFileObjectAutoLoader$ - Found subtypes of GeneratedFileObject: []
15:39:57.729 [ZScheduler-Worker-6] INFO z.t.p.i.ProtoFileObjectAutoLoader$ - Loaded 0 GeneratedFileObject(s): []
15:39:57.916 [zio-default-blocking-1] INFO i.t.s.WorkflowServiceStubsImpl - Created WorkflowServiceStubs for channel: ManagedChannelOrphanWrapper{delegate=ManagedChannelImpl{logId=1, target=127.0.0.1:7233}}
timestamp=2023-04-17T18:39:58.476190Z level=INFO thread=#zio-fiber-6 message="Going to create an exchange order" location=com.cryptostock.Main$.run.exampleFlow.macro file=Main.scala line=19
15:39:58.486 [zio-default-blocking-2] INFO zio-slf4j-logger - timestamp=2023-04-17T15:39:58.483441-03:00 level=INFO thread=zio-fiber-6 message="Going to create an exchange order"
timestamp=2023-04-17T18:39:58.510999Z level=INFO thread=#zio-fiber-6 message="Going to trigger workflow" location=com.cryptostock.clients.ExchangeClientService.exchangeOrder.macro file=ExchangeClientService.scala line=39
15:39:58.511 [zio-default-blocking-2] INFO zio-slf4j-logger - timestamp=2023-04-17T15:39:58.511369-03:00 level=INFO thread=zio-fiber-6 message="Going to trigger workflow"
timestamp=2023-04-17T18:39:58.600419Z level=INFO thread=#zio-fiber-6 message="Order placed" location=com.cryptostock.Main$.run.exampleFlow.macro file=Main.scala line=24
15:39:58.601 [zio-default-blocking-2] INFO zio-slf4j-logger - timestamp=2023-04-17T15:39:58.600896-03:00 level=INFO thread=zio-fiber-6 message="Order placed"
15:39:58.726 [workflow-method-10b07a55-347f-4119-9b59-b1e4edffcc14-53a41b05-e116-4b6e-9af1-d92f2cf6c16c] ERROR i.t.i.sync.WorkflowExecuteRunnable - Workflow execution failure WorkflowId=10b07a55-347f-4119-9b59-b1e4edffcc14, RunId=53a41b05-e116-4b6e-9af1-d92f2cf6c16c, WorkflowType=ExchangeWorkflow
io.temporal.common.converter.DataConverterException: Unable to convert metadata {
key: "encoding"
value: "binary/protobuf"
}
metadata {
key: "messageType"
value: "com.cryptostock.exchange.ExchangeOrderRequest"
}
data: "\n\022\t\344N\026\214\235!\315\356\021\307\220#\267;$\027\252\022\b\b\001\022\004\n\002\001\244\030\000"
to com.cryptostock.exchange.ExchangeOrderRequest when parsing:"
�N��!��ǐ#�;$�
�" into following types: [class com.cryptostock.exchange.ExchangeOrderRequest]
at io.temporal.common.converter.DefaultDataConverter.fromPayload(DefaultDataConverter.java:140)
at io.temporal.common.converter.DataConverter.arrayFromPayloads(DataConverter.java:105)
at io.temporal.internal.sync.POJOWorkflowImplementationFactory$POJOWorkflowImplementation.execute(POJOWorkflowImplementationFactory.java:278)
at io.temporal.internal.sync.WorkflowExecuteRunnable.run(WorkflowExecuteRunnable.java:68)
at io.temporal.internal.sync.SyncWorkflow.lambda$start$0(SyncWorkflow.java:135)
at io.temporal.internal.sync.CancellationScopeImpl.run(CancellationScopeImpl.java:102)
at io.temporal.internal.sync.WorkflowThreadImpl$RunnableWrapper.run(WorkflowThreadImpl.java:107)
at io.temporal.worker.WorkerFactory.lambda$newWorkflowThreadExecutor$7(WorkerFactory.java:392)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: zio.temporal.protobuf.ProtobufPayloadException: Unable to convert metadata {
key: "encoding"
value: "binary/protobuf"
}
metadata {
key: "messageType"
value: "com.cryptostock.exchange.ExchangeOrderRequest"
}
data: "\n\022\t\344N\026\214\235!\315\356\021\307\220#\267;$\027\252\022\b\b\001\022\004\n\002\001\244\030\000"
to com.cryptostock.exchange.ExchangeOrderRequest
at zio.temporal.protobuf.ScalapbPayloadConverter.getCompanion$$anonfun$1(ScalapbPayloadConverter.scala:158)
at scala.collection.immutable.HashMap.getOrElse(HashMap.scala:678)
at zio.temporal.protobuf.ScalapbPayloadConverter.getCompanion(ScalapbPayloadConverter.scala:158)
at zio.temporal.protobuf.ScalapbPayloadConverter.fromData(ScalapbPayloadConverter.scala:113)
at io.temporal.common.converter.DefaultDataConverter.fromPayload(DefaultDataConverter.java:136)
... 12 common frames omitted
15:39:58.774 [workflow-method-10b07a55-347f-4119-9b59-b1e4edffcc14-53a41b05-e116-4b6e-9af1-d92f2cf6c16c] ERROR i.t.i.sync.WorkflowExecuteRunnable - Workflow execution failure WorkflowId=10b07a55-347f-4119-9b59-b1e4edffcc14, RunId=53a41b05-e116-4b6e-9af1-d92f2cf6c16c, WorkflowType=ExchangeWorkflow
io.temporal.common.converter.DataConverterException: Unable to convert metadata {
key: "encoding"
value: "binary/protobuf"
}
metadata {
key: "messageType"
value: "com.cryptostock.exchange.ExchangeOrderRequest"
}
data: "\n\022\t\344N\026\214\235!\315\356\021\307\220#\267;$\027\252\022\b\b\001\022\004\n\002\001\244\030\000"
to com.cryptostock.exchange.ExchangeOrderRequest when parsing:"
�N��!��ǐ#�;$�
�" into following types: [class com.cryptostock.exchange.ExchangeOrderRequest]
at io.temporal.common.converter.DefaultDataConverter.fromPayload(DefaultDataConverter.java:140)
at io.temporal.common.converter.DataConverter.arrayFromPayloads(DataConverter.java:105)
at io.temporal.internal.sync.POJOWorkflowImplementationFactory$POJOWorkflowImplementation.execute(POJOWorkflowImplementationFactory.java:278)
at io.temporal.internal.sync.WorkflowExecuteRunnable.run(WorkflowExecuteRunnable.java:68)
at io.temporal.internal.sync.SyncWorkflow.lambda$start$0(SyncWorkflow.java:135)
at io.temporal.internal.sync.CancellationScopeImpl.run(CancellationScopeImpl.java:102)
at io.temporal.internal.sync.WorkflowThreadImpl$RunnableWrapper.run(WorkflowThreadImpl.java:107)
at io.temporal.worker.WorkerFactory.lambda$newWorkflowThreadExecutor$7(WorkerFactory.java:392)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: zio.temporal.protobuf.ProtobufPayloadException: Unable to convert metadata {
key: "encoding"
value: "binary/protobuf"
}
metadata {
key: "messageType"
value: "com.cryptostock.exchange.ExchangeOrderRequest"
}
data: "\n\022\t\344N\026\214\235!\315\356\021\307\220#\267;$\027\252\022\b\b\001\022\004\n\002\001\244\030\000"
to com.cryptostock.exchange.ExchangeOrderRequest
at zio.temporal.protobuf.ScalapbPayloadConverter.getCompanion$$anonfun$1(ScalapbPayloadConverter.scala:158)
at scala.collection.immutable.HashMap.getOrElse(HashMap.scala:678)
at zio.temporal.protobuf.ScalapbPayloadConverter.getCompanion(ScalapbPayloadConverter.scala:158)
at zio.temporal.protobuf.ScalapbPayloadConverter.fromData(ScalapbPayloadConverter.scala:113)
at io.temporal.common.converter.DefaultDataConverter.fromPayload(DefaultDataConverter.java:136)
... 12 common frames omitted
The build.sc file is below:
import mill._, mill.scalalib._, mill.scalalib.scalafmt._
import $ivy.`com.lihaoyi::mill-contrib-scalapblib:`, contrib.scalapblib._
object versions {
val scala3 = "3.1.2"
val zio = "2.0.0"
val ziologging = "2.0.0"
val ziotemporal = "0.1.0-RC5"
}
object cryptostock extends SbtModule with ScalafmtModule with ScalaPBModule {
def scalaVersion = versions.scala3
def ivyDeps = Agg(
ivy"dev.zio::zio:${versions.zio}",
ivy"dev.zio::zio-logging:${versions.ziologging}",
ivy"dev.zio::zio-logging-slf4j:${versions.ziologging}",
ivy"dev.vhonta::zio-temporal-core:${versions.ziotemporal}",
ivy"dev.vhonta::zio-temporal-protobuf:${versions.ziotemporal}",
ivy"com.thesamet.scalapb::scalapb-runtime:0.11.11",
ivy"ch.qos.logback:logback-classic:1.2.11"
)
object test extends Tests {
def testFramework = T("zio.test.sbt.ZTestFramework")
def ivyDeps = Agg(
ivy"dev.zio::zio-test:${versions.zio}",
ivy"dev.zio::zio-test-sbt:${versions.zio}",
ivy"dev.vhonta::zio-temporal-testkit:${versions.ziotemporal}"
)
}
def scalaPBVersion = "0.11.11"
def scalaPBSources = T.sources {
millSourcePath / "src" / "main" / "protobuf"
}
def scalaPBIncludePath = T.sources(Seq(scalaPBUnpackProto()))
def scalaPBFlatPackage = true
def scalaPBGrpc = false
}
When giving the workflow an explicit name (with workflowMethod
annotation) that is different from the trait name, the workflow is not found when using a typed stub.
I assume this is due to the move to untyped stubs internally
Protobuf:
Workflow:
+=
, -=
for numbers, something for Map
)ZAsync:
Misc:
ZIO_TEMPORAL_ZWORKFLOWSERVICESTUBS_SERVERURL
=> ZIO_TEMPORAL_ZWORKFLOW_SERVICE_STUBS_SERVER_URL
, etc.)When creating activities with ZIO effects that might fail, the return value of ZActivity.run
is Either[Failure,Success]
.
@activityInterface
trait EchoActivity:
@activityMethod
def echo(msg: String): Either[Exception, String]
// Activity Implementation
class EchoActivityImpl(
implicit options: ZActivityOptions[Any],
) extends EchoActivity:
override def echo(msg: String): Either[Exception, String] =
ZActivity.run:
for
_ <- ZIO.logDebug(s"Received message: $msg")
newMsg <- eventuallyFail(s"ACK: $msg", 30)
yield newMsg
// This returns a ZIO[Any, Exception, String] which is mapped to an Either in `.run`
Like above, having the return values in my activity signature (which is required) and implementation as Either[Failure,Success]
, throw a Temporal serialization error:
timestamp=2023-03-21T09:24:04.549352-03:00 level=ERROR thread=zio-fiber-0 message="" cause=Exception in thread "zio-fiber-6" zio.temporal.TemporalClientError: TemporalClientError(io.temporal.client.WorkflowFailedException: Workflow execution {workflowId='echo-b3794948-b558-48ab-a76d-8dacadabe3b4', runId='6ce905f5-a64c-415b-bedf-7ed575bf625c', workflowType='EchoWorkflow'} failed. Metadata: {closeEventType='EVENT_TYPE_WORKFLOW_EXECUTION_FAILED', retryState='RETRY_STATE_RETRY_POLICY_NOT_SET', workflowTaskCompletedEventId=10'} cause=io.temporal.failure.ApplicationFailure: message='class java.util.LinkedHashMap cannot be cast to class scala.util.Either (java.util.LinkedHashMap is in module java.base of loader 'bootstrap'; scala.util.Either is in unnamed module of loader 'app')', type='java.lang.ClassCastException', nonRetryable=false)
at zio.temporal.internal.TemporalInteraction$.fromFuture.macro(TemporalInteraction.scala:27)
at zio.temporal.internal.TemporalInteraction$.fromFuture.macro(TemporalInteraction.scala:28)
at <empty>.Client.workflowResultZIO(zio-temporal-activity-retry.scala:125)
at zio.temporal.worker.ZWorkerFactory.use.macro(ZWorkerFactory.scala:27)
at zio.temporal.worker.ZWorkerFactory.use.macro(ZWorkerFactory.scala:28)
at <empty>.Main.run.program(zio-temporal-activity-retry.scala:142)
at <empty>.Main.run(zio-temporal-activity-retry.scala:144)
The workaround is defining the signature as Any
like:
@activityInterface
trait EchoActivity:
@activityMethod
def echo(msg: String): Any
// Activity Implementation
class EchoActivityImpl(
implicit options: ZActivityOptions[Any],
) extends EchoActivity:
override def echo(msg: String): Any =
ZActivity.run:
for
_ <- ZIO.logDebug(s"Received message: $msg")
newMsg <- eventuallyFail(s"ACK: $msg", 30)
yield newMsg
Which kinda works but the return value from Temporal is still an Either wrapped in json as seen in the logs below requiring something like a cast to extract the correct value.
❯ scli zio-temporal-activity-retry.scala
SLF4J: No SLF4J providers were found.
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See https://www.slf4j.org/codes.html#noProviders for further details.
timestamp=2023-03-21T09:22:42.818928-03:00 level=INFO thread=zio-fiber-7 message="Started sample-worker"
timestamp=2023-03-21T09:22:43.165645-03:00 level=INFO thread=zio-fiber-6 message="Will submit message "testMsg""
timestamp=2023-03-21T09:22:43.427282-03:00 level=DEBUG thread=zio-fiber-27 message="Received message: testMsg"
timestamp=2023-03-21T09:22:43.523249-03:00 level=INFO thread=zio-fiber-6 message="The workflow result: {value=ACK: testMsg, left=false, right=true}"
The full code is in https://gist.github.com/carlosedp/61703d5c436dc50ec5876249fbbbed31
Try utilizing Scope-based resource management
ZChildWorkflowStub
(instead of a concrete ZChildWorkflowStub.Of[A]
inside ZChildWorkflowStub.execute
(and etc.), the error message is obscure:Exception occurred while executing macro expansion.
java.util.NoSuchElementException: head of empty list
at scala.collection.immutable.Nil$.head(List.scala:662)
at scala.collection.immutable.Nil$.head(List.scala:661)
at zio.temporal.internal.InvocationMacroUtils$MethodInvocation.selectJavaReprOf(InvocationMacroUtils.scala:63)
at zio.temporal.workflow.ZChildWorkflowExecutionSyntax$.executeImpl(ZChildWorkflowExecutionSyntax.scala:26)
This can definitely be improved
Reproducer:
import zio.*
import zio.temporal.*
import zio.temporal.testkit.*
import zio.temporal.workflow.*
object Repro extends ZIOApp {
override def run: ZIO[Environment, Any, Unit] = e
val environmentTag: EnvironmentTag[Environment] = EnvironmentTag[Environment]
val e = for {
testEnv <- ZIO.service[ZTestWorkflowEnvironment[Any]]
worker <- testEnv.newWorker("q")
_ <- worker.addWorkflow[WFImpl].fromClass
_ <- worker.addActivityImplementation(ActivityImpl)
_ <- testEnv.start
wfStub <- testEnv.workflowClient.newWorkflowStub[WF]
.withTaskQueue("q")
.withWorkflowId("wid")
.withWorkflowRunTimeout(1.minute)
.build
_ <- ZWorkflowStub.execute(wfStub.run())
} yield ()
override type Environment = ZTestWorkflowEnvironment[Any]
override def bootstrap: ZLayer[ZIOAppArgs, Any, Environment] = ZLayer.make[ZTestWorkflowEnvironment[Any]](
ZTestWorkflowEnvironment.make[Any],
ZTestEnvironmentOptions.default
)
}
@activityInterface
trait Activity {
@activityMethod
def act(): Unit
}
object ActivityImpl extends Activity {
override def act(): Unit = println("act")
}
@workflowInterface
trait WF {
@workflowMethod
def run(): Unit
}
class WFImpl extends WF {
override def run(): Unit =
ZWorkflow.newLocalActivityStub[Activity]
.withStartToCloseTimeout(10.seconds)
.build.act()
}
I'm trying to generate assembly JARs for some applications(to run in a Docker container for example) and when run they generate exceptions.
Sample:
https://gist.github.com/carlosedp/8af4ec212bd987335c47933bee4c8db0
❯ scala-cli package -f zio-temporal.scala -o ziotemporal.jar --assembly
Wrote /Users/cdepaula/repos/scala-playground/zio/ziotemporal.jar, run it with
./ziotemporal.jar
❯ ll ziotemporal.jar
.rwxr-xr-x 57Mi cdepaula 13 Mar 09:28 ziotemporal.jar*
Error:
On another application from https://github.com/carlosedp/zio-temporal-hello, I generate the Assembly and it returns:
Both run fine when I use the build tool run
, like scala-cli zio-temporal.scala
for the GIST and ./mill run worker.run
.
Once ZIO drops Java 8, we have to do it as well.
Follow this PR and further ZIO releases zio/zio#8434
I got bit by a problem with the reflection logic that attempts to determine whether
Async.function
is being passed a method reference inMethodReferenceDisassembler#isAsyncJava
. Turns out that a method reference in Scala has a different result fromgetImplMethodKind
. It returnsMethodHandleInfo.REF_invokeStatic
instead ofMethodHandleInfo.REF_invokeInterface
. We had to invoke this through a small java shim to get it to function correctly and not execute directly on the local workflow executor.I think ztemporal may have the same problem. It also appears to me to call
Async.function
passing a Scala lambda expression.
temporalio/sdk-java#1007 (comment)
I have not tested this myself yet, but it certainly looks like this is a problem. I'd like to start using zio-temporal soon, so hopefully I can verify and maybe find a solution.
With the Java SDK one can do:
val sa = Workflow.getTypedSearchAttributes
val optionsBuilder = ChildWorkflowOptions.newBuilder().setTypedSearchAttributes(sa)
With the current zio-temporal API, this is not possible, or at least I couldn't find a way to do that
While migrating from 0.5 to 0.6, I gound that ZWorkflowOptions
is positional.
If I setup as:
client.newWorkflowStub[EchoWorkflow](
ZWorkflowOptions
.withTaskQueue(TemporalQueues.echoQueue)
.withWorkflowId(workflowID)
.withWorkflowExecutionTimeout(60.seconds)
.withWorkflowRunTimeout(10.seconds)
.withRetryOptions(
ZRetryOptions.default
.withMaximumAttempts(3)
.withInitialInterval(300.millis)
.withBackoffCoefficient(1)
)
)
I get the error:
[error] -- [E008] Not Found Error: /Users/cdepaula/repos/scala/zio-temporal-hello/webclient/src/WebClient.scala:26:25
[error] 25 | ZWorkflowOptions
[error] 26 | .withTaskQueue(TemporalQueues.echoQueue)
[error] | ^
[error] |value withTaskQueue is not a member of object zio.temporal.workflow.ZWorkflowOptions
[error] one error found
1 targets failed
But shuffling up the withWorkflowId
, it works:
client.newWorkflowStub[EchoWorkflow](
ZWorkflowOptions
.withWorkflowId(workflowID)
.withTaskQueue(TemporalQueues.echoQueue)
.withWorkflowExecutionTimeout(60.seconds)
.withWorkflowRunTimeout(10.seconds)
.withRetryOptions(
ZRetryOptions.default
.withMaximumAttempts(3)
.withInitialInterval(300.millis)
.withBackoffCoefficient(1)
)
)
I think the order shouldn't matter.
We don't need a custom ZLayerAspect
implementation as ZIO recently introduced its own.
We need to remove our own and migrate to the ZIO's ZLayer aspects once zio/zio#8391 is released
It would be great to have simple starters examples (same as in zio-temporal-samples
repo)
As mentioned by @carlosedp :
It could be a tutorial section on the website
Running temporal workflows via sbt/mill in combination with zio-temporal-protobuf
produces a noisy warning (which doesn't affect the code BTW).
❯ ./mill -i show cryptostock.run
[1/1] show > [67/67] cryptostock.run
18:42:58.992 [ZScheduler-Worker-3] TRACE z.t.p.i.ProtoFileObjectAutoLoader$ - Provided exclude prefixes: [org/scalatools/,com.google.protobuf,com/google/type/,sbt,com.uber.m3,gogoproto/,com.google.geo,magnolia,org/codehaus/mojo/,com/google/longrunning/,com.cronutils,com.google.thirdparty,xsbt/,com.typesafe.config,distage,com/google/rpc/,logstage/,com/google/common/,akka/,com/thoughtworks/,com.google.logging,org/checkerframework/,zio/temporal/,org.slf4j,com/sun/,io/perfmark/,akka,android/,io.temporal,org.checkerframework,sun,com.google.api,com/google/errorprone/,com/cronutils/,org.scalatools,com/google/geo/,com.google.rpc,com.sun,org.LatencyUtils,izumi,org.reflections,com.google.errorprone,xsbti,xsbti/,sbt/,com/fasterxml/,logstage,pureconfig/,pureconfig,scala/,org/reflections/,com.thoughtworks,com/google/protobuf/,java,com/google/cloud/,com.google.common,com/uber/m3/,jdk/,cats/,izumi/,com.google.longrunning,javax,org/LatencyUtils/,org/slf4j/,org.codehaus.mojo,com.google.gson,org.HdrHistogram,com/google/logging/,com/google/gson/,com/typesafe/config/,io.grpc,zio,xsbt,net/sf/cglib/,io/micrometer/,com.google.type,javax/,com/google/api/,scala,magnolia/,zio/,sun/,io.micrometer,org/HdrHistogram/,com/google/thirdparty/,io.perfmark,mercator,io/temporal/,com.google.cloud,zio.temporal,cats,mercator/,META-INF,org/apache/ivy/,java/,org.apache.ivy,distage/,io/grpc/,com.fasterxml,jdk,module-info.class,META-INF/,gogoproto,android,net.sf.cglib]
18:42:59.228 [ForkJoinPool.commonPool-worker-23] WARN org.reflections.Reflections - could not create Vfs.Dir from url. ignoring the exception and continuing
org.reflections.ReflectionsException: could not create Vfs.Dir from url, no matching UrlType was found [file:/home/carlosedp/repos/func-scala-2022-zio-temporal/cryptostock/compile-resources]
either use fromURL(final URL url, final List<UrlType> urlTypes) or use the static setDefaultURLTypes(final List<UrlType> urlTypes) or addDefaultURLTypes(UrlType urlType) with your specialized UrlType.
at org.reflections.vfs.Vfs.fromURL(Vfs.java:113)
at org.reflections.vfs.Vfs.fromURL(Vfs.java:95)
at org.reflections.Reflections.lambda$scan$2(Reflections.java:176)
at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
at java.base/java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1707)
at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
at java.base/java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:290)
at java.base/java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:754)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)```
Hello! I was really excited to try zio-temporal for a project but from one day to the next I was unable to compile my project. Now I am unable to compile any project, on any computer I have tried (one MacBook and one Thinkpad running Ubuntu), where I include zio-temporal. I did eventually find a workaround though.
Minimum Example
sbt new scala/hello-world.g8
build.sbt
:lazy val temporalCore = "dev.vhonta" %% "zio-temporal-core" % "0.1.0-RC6"
libraryDependencies ++= Seq(
"org.scala-lang.modules" %% "scala-parser-combinators" % "2.1.1",
temporalCore
)
sbt compile
in repo.Expected Behaviour
My project compiles as it should
Actual Behaviour
I get the following error:
[warn] Note: Unresolved dependencies path:
[error] sbt.librarymanagement.ResolveException: Error downloading $com.google.protobuf:protobuf-java:
[error] Not found
[error] Not found
[error] not found: /home/ragge/.ivy2/local$com.google.protobuf/protobuf-java/ivys/ivy.xml
[error] not found: https://repo1.maven.org/maven2/$com/google/protobuf/protobuf-java//protobuf-java-.pom
[error] at lmcoursier.CoursierDependencyResolution.unresolvedWarningOrThrow(CoursierDependencyResolution.scala:344)
[error] at lmcoursier.CoursierDependencyResolution.$anonfun$update$38(CoursierDependencyResolution.scala:313)
[error] at scala.util.Either$LeftProjection.map(Either.scala:573)
[error] at lmcoursier.CoursierDependencyResolution.update(CoursierDependencyResolution.scala:313)
[error] at sbt.librarymanagement.DependencyResolution.update(DependencyResolution.scala:60)
[error] at sbt.internal.LibraryManagement$.resolve$1(LibraryManagement.scala:59)
[error] at sbt.internal.LibraryManagement$.$anonfun$cachedUpdate$12(LibraryManagement.scala:133)
[error] at sbt.util.Tracked$.$anonfun$lastOutput$1(Tracked.scala:73)
[error] at sbt.internal.LibraryManagement$.$anonfun$cachedUpdate$20(LibraryManagement.scala:146)
[error] at scala.util.control.Exception$Catch.apply(Exception.scala:228)
[error] at sbt.internal.LibraryManagement$.$anonfun$cachedUpdate$11(LibraryManagement.scala:146)
[error] at sbt.internal.LibraryManagement$.$anonfun$cachedUpdate$11$adapted(LibraryManagement.scala:127)
[error] at sbt.util.Tracked$.$anonfun$inputChangedW$1(Tracked.scala:219)
[error] at sbt.internal.LibraryManagement$.cachedUpdate(LibraryManagement.scala:160)
[error] at sbt.Classpaths$.$anonfun$updateTask0$1(Defaults.scala:3687)
[error] at scala.Function1.$anonfun$compose$1(Function1.scala:49)
[error] at sbt.internal.util.$tilde$greater.$anonfun$$u2219$1(TypeFunctions.scala:62)
[error] at sbt.std.Transform$$anon$4.work(Transform.scala:68)
[error] at sbt.Execute.$anonfun$submit$2(Execute.scala:282)
[error] at sbt.internal.util.ErrorHandling$.wideConvert(ErrorHandling.scala:23)
[error] at sbt.Execute.work(Execute.scala:291)
[error] at sbt.Execute.$anonfun$submit$1(Execute.scala:282)
[error] at sbt.ConcurrentRestrictions$$anon$4.$anonfun$submitValid$1(ConcurrentRestrictions.scala:265)
[error] at sbt.CompletionService$$anon$2.call(CompletionService.scala:64)
[error] at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
[error] at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:577)
[error] at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
[error] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
[error] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
[error] at java.base/java.lang.Thread.run(Thread.java:1589)
[error] (update) sbt.librarymanagement.ResolveException: Error downloading $com.google.protobuf:protobuf-java:
Workaround
The problem seems to be (to me) that when sbt tries to resolve the transitive dependency com.google.protobuf:protobuf-java
it somehow adds $com.google.protobuf:protobuf-java
as a dependency. I managed to get my projects compiling by simply adding the following to the example above:
lazy val temporalCore = "dev.vhonta" %% "zio-temporal-core" % "0.1.0-RC6"
libraryDependencies ++= Seq(
"org.scala-lang.modules" %% "scala-parser-combinators" % "2.1.1",
temporalCore
)
excludeDependencies += ExclusionRule("$com.google.protobuf","protobuf-java")//<-- What I added
I know too little about sbt, but only adding a dependency on protobuf-java
does not cause any issue, so my guess would be that there is some bug in the zio-temporal lib.
Would be thankful for any help on this.
In a test spec, I do some operations and they print the logs. I added TestAspect.silentLogging
but the shutdown message is still printed.
Without the TestAspect.silentLogging
aspect:
❯ ./mill e2e.test
[227/227] e2e.test.test
+ E2E
timestamp=2023-09-21T23:35:05.038217Z level=INFO thread=#zio-fiber-74 message="Started sample-worker listening to queue echo-queue" location=<empty>.Worker.worker file=Worker.scala line=7
timestamp=2023-09-21T23:35:05.405171Z level=INFO thread=#zio-fiber-74 message="ZWorkerFactory started" location=zio.temporal.worker.ZWorkerFactory.start file=ZWorkerFactory.scala line=32
timestamp=2023-09-21T23:35:05.440742Z level=INFO thread=#zio-fiber-74 message="Will submit message "testmsg123" with workflowID client-BKFH-02740-EQEY-49442" location=<empty>.Client.invokeWorkflow file=Client.scala line=17
timestamp=2023-09-21T23:35:05.810349Z level=INFO thread=#zio-fiber-96 message="Worker: Success processing message" location=<empty>.EchoActivityImpl.eventuallyFail file=EchoActivity.scala line=50
timestamp=2023-09-21T23:35:05.957688Z level=INFO thread=#zio-fiber-23 message="ZWorkerFactory shutdownNow initiated..." location=zio.temporal.worker.ZWorkerFactory.shutdownNow file=ZWorkerFactory.scala line=52
+ Start worker and send msg via client
1 tests passed. 0 tests failed. 0 tests ignored.
Executed in 1 s 820 ms
With @@ TestAspect.silentLogging
added:
❯ ./mill e2e.test
[220/227] e2e.test.compile
[info] compiling 1 Scala source to /Users/cdepaula/repos/scala/zio-temporal-hello/out/e2e/test/compile.dest/classes ...
[info] done compiling
[227/227] e2e.test.test
+ E2E
timestamp=2023-09-21T23:35:28.806628Z level=INFO thread=#zio-fiber-23 message="ZWorkerFactory shutdownNow initiated..." location=zio.temporal.worker.ZWorkerFactory.shutdownNow file=ZWorkerFactory.scala line=52
+ Start worker and send msg via client
1 tests passed. 0 tests failed. 0 tests ignored.
Executed in 1 s 840 ms
The code is from https://github.com/carlosedp/zio-temporal-hello/blob/main/e2e/test/src/E2ESpec.scala
When running an activity as LocalActivity with ZActivity run, it throws an exception:
Caused by: io.temporal.failure.ApplicationFailure: message='getTaskToken is not supported for local activities', type='java.lang.UnsupportedOperationException', nonRetryable=false
Need to handle this inside ZActivity.run
when trying to get the task token for async completion
In zio-temporal, WorkflowStubs and ActivityStubs are built using a custom builder, while Java SDK provides various WorkflowOptions, ActivityOptions, etc.
Because of this implementation details, various Java SDK functionality cannot be supported, including:
We need to replace builders with corresponding wrappers around Java SDK's options. The wrappers should be typesafe enough
Add the following pages
Need to backport more Java SDK functionality by providing convenient ZIO wrappers for them.
It includes simple methods, while complicated functionality is decoupled into dedicated tickets.
An incomplete list of what to add:
io.temporal.workflow.WorkflowQueue
)Problem:
Minimal reproducible example here https://github.com/alterationx10/temporal-either-repro/blob/main/src/test/scala/TemporalEitherSpec.scala.
The problem is that jackson-module-scala needs a type hint for Left/Right value. It worked somehow before untyped-stub rewrite.
TODO:
NOTE: izumi reflect may help to do it (it's already available because of zio)
Either's are not retried by temporal. Only thrown exceptions are.
The only way ZIO errors can be retried is using ZIO.die, which looks strange
Here is a list of examples to verify the new zio-temporal error model on:
Take a look at @carlosedp example here https://github.com/carlosedp/zio-temporal-hello/tree/main/shared/resources/META-INF/native-image
I believe it could be handled somehow
It would be great to have a build.sbt file for each example instead of using the main one. This way, people starting with zio-temporal have a clear base on what to use on fresh project.
I can submit this once #36 is merged. Also if desired I can also add the build.sc for Mill (which is the build tool I use as default).
Currently, the only way to do so is using Java SDK directly:
_ <- ZIO.serviceWith[ZTestWorkflowEnvironment[Any]] { zTestEnv =>
zTestEnv.toJava.sleep(java.time.Duration.ofDays(5))
}
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.