Giter Site home page Giter Site logo

playgrounds's Introduction

Playgrounds

Playgrounds aims to provide a quick-start environment and examples for users to quickly understand the features of PyFlink. Playgrounds setup environment with docker-compose and integrates PyFlink, Kafka, Python to make it easy for experience. The current Playgrounds examples are based on the latest PyFlink (1.13.0).

Usage

Please checkout specific branches on how to use PyFlink in a specific Flink version as PyFlink is still in active development and more and more functionalities are added in each version.

Create Docker Image

cd image
 
# create docker image
docker build --tag pyflink/playgrounds:1.13.0-rc2 .

# publish docker image
docker push pyflink/playgrounds:1.13.0-rc2

Environment Setup

  1. Install Docker.
  2. Get Docker Compose configuration
git clone https://github.com/pyflink/playgrounds.git
  1. Setup environment
  • Linux & MacOS
cd playgrounds
docker-compose up -d
  • Windows
cd playgrounds
set COMPOSE_CONVERT_WINDOWS_PATHS=1
docker-compose up -d
  1. Check the logs of TM and JM

Check the logs of JM:

docker-compose logs jobmanager

Check the logs of TM:

docker-compose logs taskmanager

You can check whether the environment is running correctly by visiting Flink Web UI http://localhost:8081.

Examples

  1. PyFlink Table API WordCount
  2. Read and write with Kafka using PyFlink Table API
  3. Python UDF
  4. Python UDF with dependencies
  5. Python Pandas UDF
  6. Python UDF with metrics
  7. Python UDF used in Java Table API jobs
  8. Python UDF used in pure-SQL jobs
  9. PyFlink DataStream API WordCount
  10. Keyed Stream of PyFlink DataStream API
  11. State Access in PyFlink DataStream API

1-PyFlink Table API WordCount

Code:1-word_count.py

Run:

cd playgrounds
docker-compose exec jobmanager ./bin/flink run -py /opt/examples/table/1-word_count.py

Check Results:

A result file will be added in the path /opt/examples/table/output/word_count_output/,

Check Results:

docker-compose exec taskmanager cat /opt/examples/table/output/word_count_output/part-aec367b4-5e68-4958-bbb9-98b264e0d314-cp-0-task-0-file-0

The results look like:

flink	2
pyflink	1

2-Read and write with Kafka using PyFlink Table API

Code:2-from_kafka_to_kafka.py

Run:

cd playgrounds
docker-compose exec jobmanager ./bin/flink run -py /opt/examples/table/2-from_kafka_to_kafka.py

Check Results:

docker-compose exec kafka kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic TempResults

The results look like:

{"rideId":3321,"taxiId":2013003189,"isStart":true,"lon":-73.99606,"lat":40.725132,"psgCnt":2,"rideTime":"2013-01-01T00:11:47Z"}
{"rideId":744,"taxiId":2013000742,"isStart":false,"lon":-73.97362,"lat":40.791283,"psgCnt":1,"rideTime":"2013-01-01T00:11:48Z"}
{"rideId":3322,"taxiId":2013003190,"isStart":true,"lon":-73.98382,"lat":40.74381,"psgCnt":1,"rideTime":"2013-01-01T00:11:48Z"}
{"rideId":3323,"taxiId":2013003191,"isStart":true,"lon":-74.00485,"lat":40.72102,"psgCnt":4,"rideTime":"2013-01-01T00:11:48Z"}

Stop job:

Visit http://localhost:8081/#/overview , select the running job and click the Cancle button.

3-Python UDF

Code:3-udf_add.py

Run:

cd playgrounds
docker-compose exec jobmanager ./bin/flink run -py /opt/examples/table/3-udf_add.py

A result file will be added in the path /opt/examples/table/output/udf_add_output

Check Results:

docker-compose exec taskmanager cat /opt/examples/table/output/udf_add_output/part-933b41cd-9388-4ba8-9437-cbf5f87c2469-cp-0-task-0-file-0

The results look like:

3

4-Python UDF with dependency

Code:4-udf_add_with_dependency.py

Check the Python Dependency management for more details about how to handle Python UDF dependencies。

Run:

cd playgrounds
docker-compose exec jobmanager ./bin/flink run -py /opt/examples/table/4-udf_add_with_dependency.py

A result file will be added in the path /opt/examples/table/output/udf_add_with_dependency_output

Check Results:

docker-compose exec taskmanager cat /opt/examples/table/output/udf_add_with_dependency_output/part-589bdd40-8cfe-4f50-9484-ae46629e0a90-0-0

The results look like:

3

5-Pandas UDF

Code:5-pandas_udf_add.py

Run:

cd playgrounds
docker-compose exec jobmanager ./bin/flink run -py /opt/examples/table/5-pandas_udf_add.py

A result file will be added in the path /opt/examples/table/output/pandas_udf_add_output/

Check Results:

docker-compose exec taskmanager cat /opt/examples/table/output/pandas_udf_add_output/part-1e9a35a7-28c3-4a46-bb84-a2fb1d62e0ed-cp-0-task-0-file-0

The results look like:

3

6-Python UDF with metrics

Code:6-udf_metrics.py

Run:

cd playgrounds
docker-compose exec jobmanager ./bin/flink run -py /opt/examples/table/6-udf_metrics.py

Visit http://localhost:8081/#/overview , select the running job and check the metrics.

7-Python UDF used in Java Table API jobs

Code:BlinkBatchPythonUdfSqlJob.java

Compile:

cd examples/table/java
mvn clean package
cd -

Run:

docker-compose exec jobmanager ./bin/flink run -d -j /opt/examples/table/java/target/pyflink-playgrounds.jar -c BlinkBatchPythonUdfSqlJob -pyfs /opt/examples/table/utils/udfs.py

8-Python UDF used in pure-SQL jobs

SQL resource file: sql-client.yaml

SQL Statement:

insert into sink select add_one(a) from (VALUES (1), (2), (3)) as source (a)

Run:

docker-compose exec jobmanager ./bin/sql-client.sh embedded --environment /opt/examples/table/sql/sql-client.yaml -pyfs /opt/examples/table/utils/udfs.py --update "insert into sink select add_one(a) from (VALUES (1), (2), (3)) as source (a)"

Check Results: A result file will be added in the path /opt/examples/table/output/sql-test-out/, with the following content:

2
3
4

9-PyFlink DataStream API WordCount

Code:9-data_stream_word_count.py

Run:

cd playgrounds
docker-compose exec jobmanager ./bin/flink run -py /opt/examples/datastream/batch/9-data_stream_word_count.py

Check Results:

A result file will be added in the path /opt/examples/datastream/output/data_stream_word_count,

Check Results:

docker-compose exec taskmanager cat /opt/examples/datastream/output/data_stream_word_count/2021-04-14--03/pre-fa447e19-a6ad-42ca-966e-3a564c7fffde-0suf

The results look like:

+I[flink, 2]
+I[pyflink, 1]

10-PyFlink DataStream API ProcessFunction

Code:10-data_stream_process_function.py

Run:

cd playgrounds
docker-compose exec jobmanager ./bin/flink run -py /opt/examples/datastream/batch/10-data_stream_process_function.py

Check Results:

A result file will be added in the path /opt/examples/datastream/output/data_stream_process_function_demo,

11-State Access in PyFlink DataStream API

Code:11-data_stream_state_access.py

Run:

cd playgrounds
docker-compose exec jobmanager ./bin/flink run -py /opt/examples/datastream/batch/11-data_stream_state_access.py

Check Results:

A result file will be added in the path /opt/examples/datastream/output/state_access,

Check Results:

docker-compose exec taskmanager cat /opt/examples/datastream/output/state_access/2021-04-14--09/pre-7b83235f-4737-4b2f-9af7-9de0e7d4a890-0suf

The results look like:

2
4
2
4
5
2
4
5
2
4

playgrounds's People

Contributors

dianfu avatar hequn8128 avatar one-matrix avatar weizhong94 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

playgrounds's Issues

Is there a open to use license on the image

I do not see an open to use license on the image - I am doing a POC with PYFLINK and found the image to be exactly sufficing my use case - hence checking if this image could be used and does this have an open source license ? Please could it be possible to add the license in the repo and the image on docker.

Pyflink 1.11 suport

I noticed there are better enchainment for Flink 1.11 especially for Kubernetes deployment. Is it possible upgrade current playground to support flink 1.11 version?

I am looking forward to get your feedback soon.

George Hu
[email protected]

1-word_count.py运行报错

当我运行:
docker-compose exec jobmanager ./bin/flink run -py /opt/examples/1-word_count.py
报错:
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:87) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1746) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
尝试过将python切换成python3,没有解决问题,有没有人遇到同样错误的?

RROR [13/15] ADD ./flink-1.13.3 /opt/flink

There is an errors "RROR [13/15] ADD ./flink-1.13.3 /opt/flink " when I execute docker build. would you like to how to fix the issues. thanks!

ls -ltr
total 460632
drwxr-xr-x 12 baineng staff 384 10 13 00:31 apache-flink-libraries-1.13.3
drwxr-xr-x 11 baineng staff 352 10 13 00:31 apache-flink-1.13.3
-rw-r--r-- 1 baineng staff 219702327 10 13 00:42 apache-flink-libraries-1.13.3.tar.gz
-rw-r--r-- 1 baineng staff 912721 10 13 00:42 apache-flink-1.13.3.tar.gz
-rwxr-xr-x 1 baineng staff 4163 11 22 14:26 docker-entrypoint.sh
-rw-r--r-- 1 baineng staff 249 11 22 14:26 requirements.txt
-rw-r--r-- 1 baineng staff 3613 11 22 15:16 Dockerfile
baineng@baineng-mac image % docker build --tag pyflink/playgrounds:1.13.0-rc2 .
[+] Building 77.2s (3/3) FINISHED
=> [internal] load build definition from Dockerfile 0.0s
=> => transferring dockerfile: 37B 0.0s
=> [internal] load .dockerignore 0.0s
=> => transferring context: 2B 0.0s
=> ERROR [internal] load metadata for docker.io/library/openjdk:8-jre 77.1s

[internal] load metadata for docker.io/library/openjdk:8-jre:


failed to solve with frontend dockerfile.v0: failed to create LLB definition: failed to do request: Head "https://registry-1.docker.io/v2/library/openjdk/manifests/8-jre": Service Unavailable
baineng@baineng-mac image % docker build --tag pyflink/playgrounds:1.13.0-rc2 .
[+] Building 4.3s (18/19)
=> [internal] load build definition from Dockerfile 0.0s
=> => transferring dockerfile: 37B 0.0s
=> [internal] load .dockerignore 0.0s
=> => transferring context: 2B 0.0s
=> [internal] load metadata for docker.io/library/openjdk:8-jre 4.2s
=> CANCELED [ 1/15] FROM docker.io/library/openjdk:8-jre@sha256:731f69f1d6a7dcb314deba3f349361e35eae7e97647f7163a65ca2f17dddaf6a 0.0s
=> => resolve docker.io/library/openjdk:8-jre@sha256:731f69f1d6a7dcb314deba3f349361e35eae7e97647f7163a65ca2f17dddaf6a 0.0s
=> => sha256:731f69f1d6a7dcb314deba3f349361e35eae7e97647f7163a65ca2f17dddaf6a 1.29kB / 1.29kB 0.0s
=> [internal] load build context 0.0s
=> => transferring context: 183B 0.0s
=> CACHED [ 2/15] RUN set -ex; apt-get update; apt-get -y install libsnappy1v5; rm -rf /var/lib/apt/lists/* 0.0s
=> CACHED [ 3/15] RUN set -ex; apt-get update; apt-get -y install python3; apt-get -y install python3-pip; apt-get -y install python3-dev; ln -s /usr/bin/python3 /usr/bin/ 0.0s
=> CACHED [ 4/15] COPY ./requirements.txt /opt 0.0s
=> CACHED [ 5/15] RUN set -ex; apt-get update; python -m pip install --upgrade pip; pip install -r /opt/requirements.txt 0.0s
=> CACHED [ 6/15] COPY ./apache-flink-libraries-1.13.3.tar.gz /opt 0.0s
=> CACHED [ 7/15] RUN set -ex; python -m pip install /opt/apache-flink-libraries-1.13.3.tar.gz rm -f /opt/apache-flink-libraries-1.13.3.tar.gz 0.0s
=> CACHED [ 8/15] COPY ./apache-flink-1.13.3.tar.gz /opt 0.0s
=> CACHED [ 9/15] RUN set -ex; python -m pip install /opt/apache-flink-1.13.3.tar.gz rm -f /opt/apache-flink-1.13.3.tar.gz 0.0s
=> CACHED [10/15] RUN set -ex; wget -nv -O /usr/local/bin/gosu "https://github.com/tianon/gosu/releases/download/1.11/gosu-$(dpkg --print-architecture)"; wget -nv -O /usr/local/ 0.0s
=> CACHED [11/15] RUN groupadd --system --gid=9999 flink && useradd --system --home-dir /opt/flink --uid=9999 --gid=flink flink 0.0s
=> CACHED [12/15] WORKDIR /opt/flink 0.0s
=> ERROR [13/15] ADD ./flink-1.13.3 /opt/flink 0.0s
=> ERROR [14/15] COPY ./flink-sql-connector-kafka_2.11-1.13.3.jar /opt/flink/lib/ 0.0s

[13/15] ADD ./flink-1.13.3 /opt/flink:



[14/15] COPY ./flink-sql-connector-kafka_2.11-1.13.3.jar /opt/flink/lib/:


failed to compute cache key: "/flink-sql-connector-kafka_2.11-1.13.3.jar" not found: not found
baineng@baineng-mac image % ls -ltr
total 460632
drwxr-xr-x 12 baineng staff 384 10 13 00:31 apache-flink-libraries-1.13.3
drwxr-xr-x 11 baineng staff 352 10 13 00:31 apache-flink-1.13.3
-rw-r--r-- 1 baineng staff 219702327 10 13 00:42 apache-flink-libraries-1.13.3.tar.gz
-rw-r--r-- 1 baineng staff 912721 10 13 00:42 apache-flink-1.13.3.tar.gz
-rwxr-xr-x 1 baineng staff 4163 11 22 14:26 docker-entrypoint.sh
-rw-r--r-- 1 baineng staff 249 11 22 14:26 requirements.txt
-rw-r--r-- 1 baineng staff 3613 11 22 15:16 Dockerfile
baineng@baineng-mac image % docker build --tag pyflink/playgrounds:1.13.0-rc2 .
[+] Building 1.5s (18/19)
=> [internal] load build definition from Dockerfile 0.0s
=> => transferring dockerfile: 37B 0.0s
=> [internal] load .dockerignore 0.0s
=> => transferring context: 2B 0.0s
=> [internal] load metadata for docker.io/library/openjdk:8-jre 1.4s
=> [internal] load build context 0.0s
=> => transferring context: 183B 0.0s
=> CANCELED [ 1/15] FROM docker.io/library/openjdk:8-jre@sha256:731f69f1d6a7dcb314deba3f349361e35eae7e97647f7163a65ca2f17dddaf6a 0.0s
=> => resolve docker.io/library/openjdk:8-jre@sha256:731f69f1d6a7dcb314deba3f349361e35eae7e97647f7163a65ca2f17dddaf6a 0.0s
=> => sha256:731f69f1d6a7dcb314deba3f349361e35eae7e97647f7163a65ca2f17dddaf6a 1.29kB / 1.29kB 0.0s
=> => sha256:07a7f781348228a0bf3e18d853f1ea79ca71dcc3373e5fc3cef371cda4ae046c 1.58kB / 1.58kB 0.0s
=> => sha256:0e7f84dc82a402fecfdfc9ba03dfdb36cf6afde40292e2b013b64a578f5e0f68 7.43kB / 7.43kB 0.0s
=> CACHED [ 2/15] RUN set -ex; apt-get update; apt-get -y install libsnappy1v5; rm -rf /var/lib/apt/lists/* 0.0s
=> CACHED [ 3/15] RUN set -ex; apt-get update; apt-get -y install python3; apt-get -y install python3-pip; apt-get -y install python3-dev; ln -s /usr/bin/python3 /usr/bin/ 0.0s
=> CACHED [ 4/15] COPY ./requirements.txt /opt 0.0s
=> CACHED [ 5/15] RUN set -ex; apt-get update; python -m pip install --upgrade pip; pip install -r /opt/requirements.txt 0.0s
=> CACHED [ 6/15] COPY ./apache-flink-libraries-1.13.3.tar.gz /opt 0.0s
=> CACHED [ 7/15] RUN set -ex; python -m pip install /opt/apache-flink-libraries-1.13.3.tar.gz rm -f /opt/apache-flink-libraries-1.13.3.tar.gz 0.0s
=> CACHED [ 8/15] COPY ./apache-flink-1.13.3.tar.gz /opt 0.0s
=> CACHED [ 9/15] RUN set -ex; python -m pip install /opt/apache-flink-1.13.3.tar.gz rm -f /opt/apache-flink-1.13.3.tar.gz 0.0s
=> CACHED [10/15] RUN set -ex; wget -nv -O /usr/local/bin/gosu "https://github.com/tianon/gosu/releases/download/1.11/gosu-$(dpkg --print-architecture)"; wget -nv -O /usr/local/ 0.0s
=> CACHED [11/15] RUN groupadd --system --gid=9999 flink && useradd --system --home-dir /opt/flink --uid=9999 --gid=flink flink 0.0s
=> CACHED [12/15] WORKDIR /opt/flink 0.0s
=> ERROR [13/15] ADD ./flink-1.13.3 /opt/flink 0.0s
=> ERROR [14/15] COPY ./flink-sql-connector-kafka_2.11-1.13.3.jar /opt/flink/lib/ 0.0s

[13/15] ADD ./flink-1.13.3 /opt/flink:



[14/15] COPY ./flink-sql-connector-kafka_2.11-1.13.3.jar /opt/flink/lib/:


failed to compute cache key: "/flink-sql-connector-kafka_2.11-1.13.3.jar" not found: not found

1.11 TypeError: Could not found the Java class 'Kafka'.

docker-compose exec jobmanager ./bin/flink run -py /opt/examples/2-from_kafka_to_kafka.py
Traceback (most recent call last):
File "/opt/examples/2-from_kafka_to_kafka.py", line 98, in
from_kafka_to_kafka_demo()
File "/opt/examples/2-from_kafka_to_kafka.py", line 21, in from_kafka_to_kafka_demo
register_rides_source(st_env)
File "/opt/examples/2-from_kafka_to_kafka.py", line 34, in register_rides_source
Kafka()
File "/opt/flink/opt/python/pyflink.zip/pyflink/table/descriptors.py", line 900, in init
File "/opt/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 184, in wrapped_call
TypeError: Could not found the Java class 'Kafka'. The Java dependencies could be specified via command line argument '--jarfile' or the config option 'pipeline.jars'
org.apache.flink.client.program.ProgramAbortException
at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:95)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)

1.11 taskmanager_1 | 2020-10-29 05:26:36,073 main ERROR Could not create plugin of type class

taskmanager_1 | 2020-10-29 05:26:36,073 main ERROR Could not create plugin of type class org.apache.logging.log4j.core.appender.RollingFileAppender for element RollingFile: java.lang.IllegalStateException: ManagerFactory [org.apache.logging.log4j.core.appender.rolling.RollingFileManager$RollingFileManagerFactory@64c87930] unable to create manager for [/opt/flink/log/flink--taskexecutor-0-e7be7ed60128.log] with data [org.apache.logging.log4j.core.appender.rolling.RollingFileManager$FactoryData@400cff1a[pattern=/opt/flink/log/flink--taskexecutor-0-e7be7ed60128.log.%i, append=false, bufferedIO=true, bufferSize=8192, policy=CompositeTriggeringPolicy(policies=[SizeBasedTriggeringPolicy(size=104857600)]), strategy=DefaultRolloverStrategy(min=1, max=10, useMax=true), advertiseURI=null, layout=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n, filePermissions=null, fileOwner=null]] java.lang.IllegalStateException: ManagerFactory [org.apache.logging.log4j.core.appender.rolling.RollingFileManager$RollingFileManagerFactory@64c87930] unable to create manager for [/opt/flink/log/flink--taskexecutor-0-e7be7ed60128.log] with data [org.apache.logging.log4j.core.appender.rolling.RollingFileManager$FactoryData@400cff1a[pattern=/opt/flink/log/flink--taskexecutor-0-e7be7ed60128.log.%i, append=false, bufferedIO=true, bufferSize=8192, policy=CompositeTriggeringPolicy(policies=[SizeBasedTriggeringPolicy(size=104857600)]), strategy=DefaultRolloverStrategy(min=1, max=10, useMax=true), advertiseURI=null, layout=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n, filePermissions=null, fileOwner=null]]

error "java.io.IOException: Mkdirs failed to create" when run the example 1-word_count.py

Job has been submitted with JobID d6cca72a392a0ff68ac8cc285ebed1b9
Traceback (most recent call last):
File "/opt/examples/table/1-word_count.py", line 34, in
t_env.execute("1-word_count")
File "/opt/flink/opt/python/pyflink.zip/pyflink/table/table_environment.py", line 1315, in execute
File "/opt/flink/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1286, in call
File "/opt/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 147, in deco
File "/opt/flink/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o4.execute.
: java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: d6cca72a392a0ff68ac8cc285ebed1b9)
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironment.java:123)
at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:80)
at org.apache.flink.table.planner.delegation.ExecutorBase.execute(ExecutorBase.java:50)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1459)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: d6cca72a392a0ff68ac8cc285ebed1b9)
at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:125)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$22(RestClusterClient.java:665)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:394)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)
at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:123)
... 19 more
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:207)
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:197)
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:188)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:677)
at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435)
at sun.reflect.GeneratedMethodAccessor40.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.table.api.TableException: Exception in writeRecord
at org.apache.flink.table.filesystem.FileSystemOutputFormat.writeRecord(FileSystemOutputFormat.java:131)
at org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.invoke(OutputFormatSinkFunction.java:87)
at org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:49)
at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
at org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:112)
at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:93)
at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
at HashAggregateWithKeys$71.endInput(Unknown Source)
at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:91)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.endInput(OperatorChain.java:423)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:419)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:661)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:776)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Mkdirs failed to create /opt/examples/table/output/word_count_output/.staging_1621992750053/cp-0/task-0
at org.apache.flink.core.fs.local.LocalFileSystem.create(LocalFileSystem.java:262)
at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.create(SafetyNetWrapperFileSystem.java:130)
at org.apache.flink.table.filesystem.FileSystemTableSink$4.open(FileSystemTableSink.java:456)
at org.apache.flink.table.filesystem.PartitionWriter$Context.createNewOutputFormat(PartitionWriter.java:61)
at org.apache.flink.table.filesystem.SingleDirectoryWriter.createFormat(SingleDirectoryWriter.java:57)
at org.apache.flink.table.filesystem.SingleDirectoryWriter.write(SingleDirectoryWriter.java:67)
at org.apache.flink.table.filesystem.FileSystemOutputFormat.writeRecord(FileSystemOutputFormat.java:129)
... 19 more

org.apache.flink.client.program.ProgramAbortException: java.lang.RuntimeException: Python process exits with code: 1
at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:134)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
Caused by: java.lang.RuntimeException: Python process exits with code: 1
at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:124)
... 13 more

Pyflink for CEP

Hi

i wish to check if there are support for Pyflink for Pattern API which is used for the Java equivalent? thank you

Created a new mysql job, and it doesn't work。There could be sth wrong with the IntSerializer according to the bug report。

container Info:

(base) [root@iz8vb6evwfagx3tyjx4fl8z data]# docker ps
CONTAINER ID        IMAGE                                                     COMMAND                  CREATED             STATUS              PORTS                                                NAMES
efc1d310eb14        pyflink/playgrounds:1.10.0                                "/docker-entrypoint.…"   3 weeks ago         Up 3 weeks          6121-6123/tcp, 8081/tcp                              playgrounds_taskmanager_1
af466f98e914        wurstmeister/kafka:2.12-2.2.1                             "start-kafka.sh"         3 weeks ago         Up 3 weeks          0.0.0.0:32768->9092/tcp                              playgrounds_kafka_1
637b020df406        mysql                                                     "docker-entrypoint.s…"   3 weeks ago         Up 2 days           0.0.0.0:3306->3306/tcp, 33060/tcp                    playgrounds_db_1
6cc0b28bb45a        pyflink/playgrounds:1.10.0                                "/docker-entrypoint.…"   3 weeks ago         Up 3 weeks          6123/tcp, 8081/tcp, 0.0.0.0:8003->8088/tcp           playgrounds_jobmanager_1
2ef57cfa9494        docker.elastic.co/elasticsearch/elasticsearch-oss:6.3.1   "/usr/local/bin/dock…"   3 weeks ago         Up 3 weeks          0.0.0.0:9200->9200/tcp, 0.0.0.0:9300->9300/tcp       playgrounds_elasticsearch_1
8087b3554674        wurstmeister/zookeeper:3.4.6                              "/bin/sh -c '/usr/sb…"   3 weeks ago         Up 3 weeks          22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp   playgrounds_zookeeper_1
cfbfb51770b8        adminer                                                   "entrypoint.sh docke…"   3 weeks ago         Up 3 weeks          0.0.0.0:8080->8080/tcp                               playgrounds_adminer_1
42a9f050933b        puckel/docker-airflow                                     "/entrypoint.sh webs…"   4 weeks ago         Up 4 weeks          5555/tcp, 8793/tcp, 0.0.0.0:8004->8080/tcp           gallant_villani

sql for table and data:

SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;

-- ----------------------------
-- Table structure for stu
-- ----------------------------
DROP TABLE IF EXISTS `stu`;
CREATE TABLE `stu`  (
  `id` int(10) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT '自增id',
  `name` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '学生名字',
  `school` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '学校名字',
  `nickname` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '学生小名',
  `age` int(11) NOT NULL COMMENT '学生年龄',
  `class_num` int(11) NOT NULL COMMENT '班级人数',
  `score` decimal(4, 2) NOT NULL COMMENT '成绩',
  `phone` bigint(20) NOT NULL COMMENT '电话号码',
  `email` varchar(64) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '家庭网络邮箱',
  `ip` varchar(32) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT 'IP地址',
  `address` text CHARACTER SET utf8 COLLATE utf8_general_ci NULL COMMENT '家庭地址',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 1000001 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

-- ----------------------------
-- Records of stu
-- ----------------------------
INSERT INTO `stu` VALUES (1, 'Ukq', '复旦附中', '敌法师', 12, 59, 38.89, 15573293938, '[email protected]', '203.0.117.157', '宁夏回族自治区帆县静安何街f座 126696');
INSERT INTO `stu` VALUES (2, 'Ukq', '上海中学', '幽鬼', 12, 59, 38.89, 15573293938, '[email protected]', '203.0.117.157', '宁夏回族自治区帆县静安何街f座 126696');
INSERT INTO `stu` VALUES (3, 'Ukq', '人和中心', '敌法师', 12, 59, 38.89, 15573293938, '[email protected]', '203.0.117.157', '宁夏回族自治区帆县静安何街f座 126696');
INSERT INTO `stu` VALUES (4, 'Ukq', '广东中学', '影魔', 12, 59, 38.89, 15573293938, '[email protected]', '203.0.117.157', '宁夏回族自治区帆县静安何街f座 126696');
INSERT INTO `stu` VALUES (5, 'kulwWkwsZ', '广东中学', '鬼泣', 12, 55, 99.59, 15789193091, '[email protected]', '192.4.223.99', '**维吾尔自治区郑州市梁平丁路a座 750587');
INSERT INTO `stu` VALUES (6, 'kulwWkwsZ', '猪场', '影魔', 12, 55, 99.59, 15789193091, '[email protected]', '192.4.223.99', '**维吾尔自治区郑州市梁平丁路a座 750587');
INSERT INTO `stu` VALUES (7, 'kulwWkwsZ', '鹅厂', '影魔', 12, 55, 99.59, 15789193091, '[email protected]', '192.4.223.99', '**维吾尔自治区郑州市梁平丁路a座 750587');
INSERT INTO `stu` VALUES (8, 'kulwWkwsZ', '上海中学', '影魔', 12, 55, 99.59, 15789193091, '[email protected]', '192.4.223.99', '**维吾尔自治区郑州市梁平丁路a座 750587');
INSERT INTO `stu` VALUES (9, 'eHFOyHtIGfiduV', '旧大院', '高小王子', 82, 43, 90.96, 18506504233, '[email protected]', '198.51.173.171', '湖南省辛集市海港王路a座 115439');
INSERT INTO `stu` VALUES (10, 'eHFOyHtIGfiduV', '人和中心', '影魔', 82, 43, 90.96, 18506504233, '[email protected]', '198.51.173.171', '湖南省辛集市海港王路a座 115439');
INSERT INTO `stu` VALUES (11, 'eHFOyHtIGfiduV', '上海中学', '歌神', 82, 43, 90.96, 18506504233, '[email protected]', '198.51.173.171', '湖南省辛集市海港王路a座 115439');
INSERT INTO `stu` VALUES (12, 'eHFOyHtIGfiduV', '华师大附中', '影魔', 82, 43, 90.96, 18506504233, '[email protected]', '198.51.173.171', '湖南省辛集市海港王路a座 115439');
INSERT INTO `stu` VALUES (13, 'kWZjrT', '猪场', '逗比', 94, 63, 94.24, 18870125400, '[email protected]', '192.88.93.128', '宁夏回族自治区南京县涪城辛集街C座 115529');
INSERT INTO `stu` VALUES (14, 'kWZjrT', '华师大二附中', '鬼泣', 94, 63, 94.24, 18870125400, '[email protected]', '192.88.93.128', '宁夏回族自治区南京县涪城辛集街C座 115529');
INSERT INTO `stu` VALUES (15, 'kWZjrT', '清华中学', '影魔', 94, 63, 94.24, 18870125400, '[email protected]', '192.88.93.128', '宁夏回族自治区南京县涪城辛集街C座 115529');
INSERT INTO `stu` VALUES (16, 'Jec', '华师大二附中', '高小王子', 38, 68, 51.16, 13248136245, '[email protected]', '198.51.103.171', '云南省西安市西峰张家港路j座 882348');
INSERT INTO `stu` VALUES (17, 'kWZjrT', '猪场', '高小王子', 94, 63, 94.24, 18870125400, '[email protected]', '192.88.93.128', '宁夏回族自治区南京县涪城辛集街C座 115529');

mysql_transfer.py:

from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.table import StreamTableEnvironment, EnvironmentSettings

s_env = StreamExecutionEnvironment.get_execution_environment()
s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
s_env.set_parallelism(1)

# use blink table planner
st_env = StreamTableEnvironment \
    .create(s_env, environment_settings=EnvironmentSettings
            .new_instance()
            .in_streaming_mode()
            .use_blink_planner().build())


source_ddl = """CREATE TABLE StuSourceTable (id int , name varchar, school varchar, nickname varchar, age int, class_num int, score decimal, phone int, email varchar, ip varchar) WITH (
        'connector.type' = 'jdbc',
        'connector.url' = 'jdbc:mysql://db:3306/flink-test',
        'connector.table' = 'stu',
        'connector.driver' = 'com.mysql.jdbc.Driver',
        'connector.username' = 'root',
        'connector.password' = 'example')
"""

sink_ddl = """CREATE TABLE StuSinkTable (
    id int,
    name varchar, 
    school varchar,
    age int
    ) WITH (
        'connector.type' = 'jdbc',
        'connector.url' = 'jdbc:mysql://db:3306/flink-test',
        'connector.table' = 'stu_result',
        'connector.driver' = 'com.mysql.jdbc.Driver',
        'connector.username' = 'root',
        'connector.password' = 'example')
"""

st_env.sql_update(source_ddl)
st_env.sql_update(sink_ddl)

t = st_env.from_path('StuSourceTable')
data = t.select("id,name,school,age")
data.insert_into('StuSinkTable')

st_env.execute("mysql_transfer")

call function:

docker-compose exec jobmanager ./bin/flink run -py /opt/examples/mysql_transfer.py

logs for bug report:

2020-07-08 09:54:41
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
	at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484)
	at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)
	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
	at akka.actor.ActorCell.invoke(ActorCell.scala:561)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
	at akka.dispatch.Mailbox.run(Mailbox.scala:225)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.ClassCastException: java.lang.Long cannot be cast to java.lang.Integer
	at org.apache.flink.api.common.typeutils.base.IntSerializer.copy(IntSerializer.java:32)
	at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93)
	at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:639)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394)
	at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:93)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)

Examples using file system fail - Permission Denied

When trying to run the examples that use the file system as a sink I get permission errors:

Caused by: java.io.FileNotFoundException: /opt/examples/data/word_count_output (Permission denied)

This happens for examples 1,3 and 4.

But the other examples work fine, (i.e. 5-word_count-mysql.py ).

I have not done anything other than docker-compose up and run the example code.

It looks like the flink user, which the java process runs as does not have access ot the mounted volumes.

2020-05-25 17:30:26
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
	at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484)
	at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)
	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
	at akka.actor.ActorCell.invoke(ActorCell.scala:561)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
	at akka.dispatch.Mailbox.run(Mailbox.scala:225)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.io.FileNotFoundException: /opt/examples/data/word_count_output (Permission denied)
	at java.io.FileOutputStream.open0(Native Method)
	at java.io.FileOutputStream.open(FileOutputStream.java:270)
	at java.io.FileOutputStream.<init>(FileOutputStream.java:213)
	at java.io.FileOutputStream.<init>(FileOutputStream.java:162)
	at org.apache.flink.core.fs.local.LocalDataOutputStream.<init>(LocalDataOutputStream.java:47)
	at org.apache.flink.core.fs.local.LocalFileSystem.create(LocalFileSystem.java:273)
	at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.create(SafetyNetWrapperFileSystem.java:126)
	at org.apache.flink.api.common.io.FileOutputFormat.open(FileOutputFormat.java:248)
	at org.apache.flink.api.java.io.TextOutputFormat.open(TextOutputFormat.java:88)
	at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:205)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
	at java.lang.Thread.run(Thread.java:748)

examples/2-from_kafka_to_kafka.py执行错误

(base) @MacBook-Pro-2 playgrounds % docker-compose exec jobmanager ./bin/flink run -py /opt/examples/2-from_kafka_to_kafka.py
Traceback (most recent call last):
File "/opt/examples/2-from_kafka_to_kafka.py", line 98, in
from_kafka_to_kafka_demo()
File "/opt/examples/2-from_kafka_to_kafka.py", line 21, in from_kafka_to_kafka_demo
register_rides_source(st_env)
File "/opt/examples/2-from_kafka_to_kafka.py", line 34, in register_rides_source
Kafka()
File "/opt/flink/opt/python/pyflink.zip/pyflink/table/descriptors.py", line 900, in init
File "/opt/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 184, in wrapped_call
TypeError: Could not found the Java class 'Kafka'. The Java dependencies could be specified via command line argument '--jarfile' or the config option 'pipeline.jars'
org.apache.flink.client.program.ProgramAbortException
at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:95)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)

执行pyflink报错 py4j.protocol.Py4JError

py4j.protocol.Py4JError: org.apache.flink.streaming.api.utils.PythonTypeUtils.getCollectionInputFormat does not exist in the JVM

我不知道这种报错到底什么意思,初学者,py文件如下:
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment

def data_stream_api_demo():
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(4)
ds = env.from_collection(collection=[(1, 'aaa|bb'), (2, 'bb|a'), (3, 'aaa|a')],type_info=Types.ROW([Types.INT(), Types.STRING()]))
def split(s):
splits = s[1].split("|")
for sp in splits:
yield s[0], sp
ds = ds.map(lambda i: (i[0] + 1, i[1])).flat_map(split).key_by(lambda i: i[1]).reduce(lambda i, j: (i[0] + j[0], i[1]))
ds.print()
env.execute()

if name == 'main':
data_stream_api_demo()

应该是我使用的有问题

dockerignore stat ./flink-1.13.0 does not exist

when i execute the command "docker build --tag pyflink/playgrounds:1.13.0-rc0 ." ,error occur :
image
dockerignore stat ./flink-1.13.0 does not exist.
i have download the file:
apache-flink-libraries-1.13.0.tar.gz and apache-flink-1.13.0.tar.gz into image directory.
waiting for your answer,thanks a lot

mysql 启动有问题 而且flink任务执行不了

docker-compose exec db mysql -u root -pexample
mysql: [Warning] Using a password on the command line interface can be insecure.
ERROR 2002 (HY000): Can't connect to local MySQL server through socket '/var/run/mysqld/mysqld.sock' (2)

error when run example -- 1-word_count.py

Hi,
I followed the readme. After start docker, run example, there is an error occurred.
Anyone can help ?

ubuntu@VM-0-8-ubuntu:~/playgrounds$ docker-compose exec jobmanager ./bin/flink run -py /opt/examples/table/1-word_count.py
Job has been submitted with JobID 0a2a94133f5b8d1216e99b59f8e6d843
Traceback (most recent call last):
  File "/opt/examples/table/1-word_count.py", line 34, in <module>
    t_env.execute("1-word_count")
  File "/opt/flink/opt/python/pyflink.zip/pyflink/table/table_environment.py", line 1315, in execute
  File "/opt/flink/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1286, in __call__
  File "/opt/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 147, in deco
  File "/opt/flink/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o4.execute.
: java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 0a2a94133f5b8d1216e99b59f8e6d843)
	at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
	at org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironment.java:123)
	at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:80)
	at org.apache.flink.table.planner.delegation.ExecutorBase.execute(ExecutorBase.java:50)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1459)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
	at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
	at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 0a2a94133f5b8d1216e99b59f8e6d843)
	at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:125)
	at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
	at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
	at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
	at org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$22(RestClusterClient.java:665)
	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
	at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
	at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:394)
	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
	at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
	at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)
	at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
	at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
	at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:123)
	... 19 more
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:207)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:197)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:188)
	at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:677)
	at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
	at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
	at akka.actor.ActorCell.invoke(ActorCell.scala:561)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
	at akka.dispatch.Mailbox.run(Mailbox.scala:225)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.table.api.TableException: Exception in writeRecord
	at org.apache.flink.table.filesystem.FileSystemOutputFormat.writeRecord(FileSystemOutputFormat.java:131)
	at org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.invoke(OutputFormatSinkFunction.java:87)
	at org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:49)
	at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
	at org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:112)
	at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:93)
	at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
	at HashAggregateWithKeys$71.endInput(Unknown Source)
	at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:91)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.endInput(OperatorChain.java:423)
	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:419)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:661)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:776)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Mkdirs failed to create /opt/examples/table/output/word_count_output/.staging_1627632423381/cp-0/task-0
	at org.apache.flink.core.fs.local.LocalFileSystem.create(LocalFileSystem.java:262)
	at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.create(SafetyNetWrapperFileSystem.java:130)
	at org.apache.flink.table.filesystem.FileSystemTableSink$4.open(FileSystemTableSink.java:456)
	at org.apache.flink.table.filesystem.PartitionWriter$Context.createNewOutputFormat(PartitionWriter.java:61)
	at org.apache.flink.table.filesystem.SingleDirectoryWriter.createFormat(SingleDirectoryWriter.java:57)
	at org.apache.flink.table.filesystem.SingleDirectoryWriter.write(SingleDirectoryWriter.java:67)
	at org.apache.flink.table.filesystem.FileSystemOutputFormat.writeRecord(FileSystemOutputFormat.java:129)
	... 19 more

org.apache.flink.client.program.ProgramAbortException: java.lang.RuntimeException: Python process exits with code: 1
	at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:134)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
	at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
	at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
	at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
	at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
	at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
	at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
	at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
	at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
Caused by: java.lang.RuntimeException: Python process exits with code: 1
	at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:124)
	... 13 more

Error when running example 2 2-from_kafka_to_kafka.py

Job has been submitted with JobID 6cc03bd88ca1d89828729a826dc71ee1
Traceback (most recent call last):
File "/opt/examples/table/2-from_kafka_to_kafka.py", line 96, in
from_kafka_to_kafka_demo()
File "/opt/examples/table/2-from_kafka_to_kafka.py", line 26, in from_kafka_to_kafka_demo
st_env.execute("2-from_kafka_to_kafka")
File "/opt/flink/opt/python/pyflink.zip/pyflink/table/table_environment.py", line 1315, in execute
File "/opt/flink/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1286, in call
File "/opt/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 147, in deco
File "/opt/flink/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o7.execute.
: java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 6cc03bd88ca1d89828729a826dc71ee1)
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironment.java:123)
at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:80)
at org.apache.flink.table.planner.delegation.ExecutorBase.execute(ExecutorBase.java:50)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1459)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 6cc03bd88ca1d89828729a826dc71ee1)
at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:125)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$22(RestClusterClient.java:665)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:394)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)
at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:123)
... 19 more
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:207)
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:197)
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:188)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:677)
at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata

org.apache.flink.client.program.ProgramAbortException: java.lang.RuntimeException: Python process exits with code: 1
at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:134)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
Caused by: java.lang.RuntimeException: Python process exits with code: 1
at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:124)
... 13 more

C:\playgrounds>

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.