天道酬勤,学无止境

Reading a csv file in batch mode using pyflink from local system

I was trying to read an established csv file while writing a pyflink job. I was using filesystem connector to get the data but after executing execute_sql() on the ddl and later doing query on the table I was getting an error which explains that it isn't able to fetch the next result. I am unable to resolve this error. I have checked the csv file and it's fully correct and working with pandas but here I don't why it isn't able to fetch the next line. For reference please find the attached code.

from pyflink.common.serialization import SimpleStringEncoder
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import StreamingFileSink
from pyflink.table import EnvironmentSettings, StreamTableEnvironment, BatchTableEnvironment
from pyflink.dataset import ExecutionEnvironment
from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment
from pyflink.table.descriptors import Schema, OldCsv, FileSystem
from pyflink.table.expressions import lit
import pandas as pd
from inspect import getmembers, isfunction
import os
## CREATE THE ENVIRONMENT

# create a blink batch TableEnvironment
env_settings = EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
table_env = BatchTableEnvironment.create(environment_settings=env_settings)
#connector for ingesting the data
source_ddl = """
                CREATE TABLE MyUserTable (
                        timestamp_info TIMESTAMP(3),
                        column_a FLOAT,
                        column_b FLOAT,
                        column_c INT,
                        ) WITH (
                          'connector' = 'filesystem',          
                          'path' = 'file:///Users/abc/Projects/temp.csv', 
                          'format' = 'csv'

                        )"""


#connector for data output/sink
sink_ddl = """
                CREATE TABLE results (
                            timestamp_info TIMESTAMP(3),
                            score FLOAT)
                            WITH (
                                'connector' = 'filesystem',
                                'path' = 'file:///Users/abc/Projects/results.csv',
                                'format' = 'csv'
                            )"""

#make the table corresponding to the schema mentioned
source_table = table_env.execute_sql(source_ddl)
sink_table = table_env.execute_sql(sink_ddl)

#convert the sql table to table API
table_path = table_env.from_path("MyUserTable")

# execute SELECT statement
table_result2 = table_env.execute_sql("SELECT timestamp_info,column_a FROM MyUserTable")
table_result2.print()

Error that was coming :-

WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.flink.api.java.ClosureCleaner (file:/Users/avil.a/opt/anaconda3/lib/python3.8/site-packages/pyflink/lib/flink-dist_2.11-1.12.2.jar) to field java.lang.Class.ANNOTATION
WARNING: Please consider reporting this to the maintainers of org.apache.flink.api.java.ClosureCleaner
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
+-------------------------+--------------------------------+
|          timestamp_info |                       column_a |
+-------------------------+--------------------------------+
Traceback (most recent call last):
  File "local_implementation.py", line 51, in <module>
    table_result2.print()
  File "/Users/avil.a/opt/anaconda3/lib/python3.8/site-packages/pyflink/table/table_result.py", line 219, in print
    self._j_table_result.print()
  File "/Users/avil.a/opt/anaconda3/lib/python3.8/site-packages/py4j/java_gateway.py", line 1285, in __call__
    return_value = get_return_value(
  File "/Users/avil.a/opt/anaconda3/lib/python3.8/site-packages/pyflink/util/exceptions.py", line 147, in deco
    return f(*a, **kw)
  File "/Users/avil.a/opt/anaconda3/lib/python3.8/site-packages/py4j/protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o46.print.
: java.lang.RuntimeException: Failed to fetch next result
    at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109)
    at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
    at org.apache.flink.table.planner.sinks.SelectTableSinkBase$RowIteratorWrapper.hasNext(SelectTableSinkBase.java:117)
    at org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:350)
    at org.apache.flink.table.utils.PrintUtils.printAsTableauForm(PrintUtils.java:149)
    at org.apache.flink.table.api.internal.TableResultImpl.print(TableResultImpl.java:154)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:64)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:564)
    at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
    at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
    at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: java.io.IOException: Failed to fetch job execution result
    at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:169)
    at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:118)
    at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
    ... 16 more
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
    at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
    at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2086)
    at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:167)
    ... 18 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
    at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
    at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:117)
    at java.base/java.util.concurrent.CompletableFuture.uniApplyNow(CompletableFuture.java:680)
    at java.base/java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:658)
    at java.base/java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:2158)
    at org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobExecutionResult(MiniClusterJobClient.java:114)
    at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:166)
    ... 18 more
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118)
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:669)
    at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
    at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:64)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:564)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.io.IOException: Failed to deserialize CSV row.
    at org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:257)
    at org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:162)
    at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:90)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:263)
Caused by: java.lang.IllegalArgumentException: Timestamp format must be yyyy-mm-dd hh:mm:ss[.fffffffff]
    at java.sql/java.sql.Timestamp.valueOf(Timestamp.java:196)
    at org.apache.flink.formats.csv.CsvToRowDataConverters.convertToTimestamp(CsvToRowDataConverters.java:250)
    at org.apache.flink.formats.csv.CsvToRowDataConverters.lambda$createNullableConverter$ac6e531e$1(CsvToRowDataConverters.java:113)
    at org.apache.flink.formats.csv.CsvToRowDataConverters.lambda$createRowConverter$18bb1dd$1(CsvToRowDataConverters.java:98)
    at org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:251)

评论

Timestamp format was not matching with the time index in the csv file. Further while reading the csv file like this it will take the first row and try to parse it and if your csv file had headers in it then it will give error because it expected TIMESTAMP format at first but got a string which can't be parsed to the required format. Hence, ensure that the csv file had no headers in it.

受限制的 HTML

  • 允许的HTML标签:<a href hreflang> <em> <strong> <cite> <blockquote cite> <code> <ul type> <ol start type> <li> <dl> <dt> <dd> <h2 id> <h3 id> <h4 id> <h5 id> <h6 id>
  • 自动断行和分段。
  • 网页和电子邮件地址自动转换为链接。

相关推荐
  • Flink 1.12.0学习与分享(pyflink)
    Flink 1.12.0学习与分享 1. 大数据实时计算引擎历史 第一代, 以Storm为代表, 高吞吐,低延迟,但精确一次消费以及开发维护便捷性,生态完善度等相对欠缺一些.第二代,以Spark 为代表, 高吞吐, 牺牲了一些延迟(微批次理念), 结合第三方框架, 可以很好实现精确一次消费. 开发维护便捷性, 生态完善度都非常好.第三代, 以Flink为代表, 设计时就以实时计算为出发点, 高吞吐,低延迟,精确一次消费语义支持, 开发维护便捷性, 生态完善度都非常好 生态完善度: 各类编程语言支持, SQL支持, 和其他大数据框架集成如Hadoop, 消息队列如Kafka, Hive, Hbase, Mysql, 监控体系等等 2. Flink是什么 在无界和有界数据流上进行状态计算的框架和分布式处理引擎。 Flink 已经可以在所有常见的集群环境中运行,并以 in-memory 的速度和任意的规模进行计算. 注意, 在大数据计算引擎,最典型是mapreduce(计算中间结果会大量落地磁盘, 性能较慢, 但很稳定) 后续的Spark, 除了shuffle时会落地磁盘,其他场景不会, 这样尽可能将中间结果放在内存, 计算会快很多,因为减少了和磁盘IO的性能消耗 再后续的Flink, 以及OLAP领域的内存计算引擎如Impala, Presto等都尽可能在计算时不做磁盘IO
  • Flink 从1.7 到1.12版本升级汇总
    . 一 .前言二 .Flink 1.7 版本2.1. Flink中的Scala 2.12支持2.2. 状态变化2.3. Exactly-once语义的S3 StreamingFileSink2.4. Streaming SQL中支持MATCH_RECOGNIZE2.5. Streaming SQL中的 Temporal Tables 和 Temporal Joins2.6. 版本化REST API2.7. Kafka 2.0 Connector2.8. 本地恢复2.9. 删除Flink的传统模式 三 .Flink 1.8 版本3.1. 使用TTL(生存时间)连续增量清除旧的Key状态3.2. 恢复保存点时对模式迁移的新支持3.3. 保存点兼容性3.4. RocksDB版本冲突并切换到FRocksDB(FLINK-10471)3.5. Maven 依赖3.6. TaskManager配置(FLINK-11716)3.7. Table API 的变动3.8. 连接器变动 四 .Flink 1.9 版本4.1. 细粒度批作业恢复 (FLIP-1)4.2. State Processor API (FLIP-43)4.3. Stop-with-Savepoint (FLIP-34)4.4. 重构 Flink WebUI4.5. 新 Blink SQL 查询处理器预览4.6. Table
  • flink python 从csv读取,写入csv
    环境 centos 6.5Python 2.7CDH 5.15flink 1.9 获得pyflink库 pyflink库在flink安装路径opt/python下 $ cd /usr/local/flink/opt/python $ cp pyflink.zip py4j-0.10.8.1-src.zip /opt/test $ cd /opt/test $ unzip pyflink.zip $ unzip py4j-0.10.8.1-src.zip 程序架构 创建TableEnvironment,定义planner,batch或streaming注册输入表注册输出表从Table API 或SQL 查询创建表发送查询结果到TableSink执行 flink流式查询 准备源数据 $ vi streaming.csv 1, 'hi', 'hello' 2, 'hi', 'hello' 流式查询 $ vi test.py from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableEnvironment, EnvironmentSettings, DataTypes, CsvTableSource, CsvTableSink # Create a
  • 不会java但是想用flink,会python就够了 !PyFlink【二、深入学习pyflink源码】
    一、PyFlink源码结构 pyflink版本号: 1.10.1 安装方法: pip install apache-flink==1.10.1 python版本要求: 截止到2020年6月7号,官方要求至少为3.5,最高为3.7 找到你当前环境中的pyflink包,一般在你的python目录中的site-package里边找到pyflink 1.1、结构说明 截图说明: pyflink下的目录结构如下 ─➤ tree . ├── README.txt ├── __init__.py ├── __pycache__ │ ├── __init__.cpython-36.pyc │ ├── find_flink_home.cpython-36.pyc │ ├── gen_protos.cpython-36.pyc │ ├── java_gateway.cpython-36.pyc │ ├── serializers.cpython-36.pyc │ ├── shell.cpython-36.pyc │ └── version.cpython-36.pyc ├── bin │ ├── bash-java-utils.jar │ ├── config.sh │ ├── find-flink-home.sh │ ├── flink │ ├── flink-console.sh │ ├─”
  • flink1.12.0发布了,他来了他来了
    Apache Flink 社区很荣幸地宣布 Flink 1.12.0 版本正式发布!近 300 位贡献者参与了 Flink 1.12.0 的开发,提交了超过 1000 多个修复或优化。这些修改极大地提高了 Flink 的可用性,并且简化(且统一)了 Flink 的整个 API 栈。其中一些比较重要的修改包括: 在 DataStream API 上添加了高效的批执行模式的支持。这是批处理和流处理实现真正统一的运行时的一个重要里程碑。 实现了基于Kubernetes的高可用性(HA)方案,作为生产环境中,ZooKeeper方案之外的另外一种选择。 扩展了 Kafka SQL connector,使其可以在 upsert 模式下工作,并且支持在 SQL DDL 中处理 connector 的 metadata。现在,时态表 Join 可以完全用 SQL 来表示,不再依赖于 Table API 了。 PyFlink 中添加了对于 DataStream API 的支持,将 PyFlink 扩展到了更复杂的场景,比如需要对状态或者定时器 timer 进行细粒度控制的场景。除此之外,现在原生支持将 PyFlink 作业部署到 Kubernetes上。 本文描述了所有主要的新功能、优化、以及需要特别关注的改动。 Flink 1.12.0 的二进制发布包和源代码可以通过 Flink
  • flink各版本变化和新增特性
    1.6新特性 Flink 1.6-有状态流处理的下一步 在Flink 1.6.0中,我们继续在较早版本中进行的基础工作:使Flink用户能够无缝地运行快速数据处理并毫不费力地构建数据驱动的数据密集型应用程序。 Flink的状态支持是使Flink在实现各种用例时如此通用和强大的关键功能之一。为了使其更容易,社区增加了对状态TTL的本地支持(FLINK-9510,FLINK-9938)。此功能允许在状态过期后对其进行清理。现在,通过Flink 1.6.0,计时器状态可以通过将相关状态存储在RocksDB中而退出内核(FLINK-9485)。最后但并非最不重要的一点是,我们还显着改进了计时器(FLINK-9423)的删除。 使用Flink 1.5.0,我们对Flink的分布式体系结构进行了重新设计,以增加对资源弹性和不同部署方案的支持,尤其是更好的容器集成。在Flink 1.6.0中,我们继续进行一些未完成的工作:现在,所有外部通信(包括作业提交)都基于HTTP / REST(FLINK-9280),从而大大简化了容器的设置。Flink 1.6.0还附带了一个容器入口点(FLINK-9488),该入口点可以轻松地引导容器化的作业集群。 流SQL是最具破坏性的功能之一,因为它使Flink更加易于访问。在Apache Flink 1.6.0中,社区进一步改进了SQL CLI(FLINK
  • pyflink作业提交的踩坑过程,看完少走两个星期弯路
    flink在努力地将Python 生态和大数据生态融合,但目前的版本还不够成熟,尤其是在官方对python现有资料有限的情况下,用户想要使用python完成一个flink job并提交到flink平台上,还是有很多雷需要踩的。 以下对pyflink环节问题,python job编写到提交做了总结,可减少不必要的弯路。 一、部署环境 JDK 1.8+ & Python 3.5+ (3.7.6) & apache-flink 1.12 & pyflink 1.0 二、官方API flink为用户提供了多层API,对于python用户,主要使用Table API 和 SQL API,个人认为Table API有一点点类似python的Dataframe,故主要使用Table API完成作业开发。用户可以参考对应版本的官方文档和示例代码学习使用。 注:这里建议一定要看官方文档,因为目前pyflink版本之间差别较大,随便搜的资料由于版本差异会造成很多不必要的麻烦。 三、环境理解 在Table API层,flink提供了3类环境和两类 planner,用户需要理解环境之间的区别和属性,以便使用正确的环境和刚好地理解一些代码参数。 简单说:TableEnviroment实现了流批一体,但不支持UDF;StreamTableEnviroment
  • 来!PyFlink 作业的多种部署模式
    关于 PyFlink 的博客我们曾介绍过 PyFlink 的功能开发,比如,如何使用各种算子(Join/Window/AGG etc.),如何使用各种 Connector(Kafka, CSV, Socket etc.),还有一些实际的案例。这些都停留在开发阶段,一旦开发完成,我们就面临激动人心的时刻,那就是将我们精心设计开发的作业进行部署,那么问题来了,你知道怎样部署 PyFlink 的作业吗? 关于 PyFlink 的博客我们曾介绍过 PyFlink 的功能开发,比如,如何使用各种算子(Join/Window/AGG etc.),如何使用各种 Connector(Kafka, CSV, Socket etc.),还有一些实际的案例。这些都停留在开发阶段,一旦开发完成,我们就面临激动人心的时刻,那就是将我们精心设计开发的作业进行部署,那么问题来了,你知道怎样部署 PyFlink 的作业吗? 本文将为大家全面介绍部署 PyFlink 作业的各种模式。 组件栈回顾 上面的组件栈除了 PyFlink 是第一次添加上去,其他部分大家应该非常熟悉了。目前 PyFlink 基于 Java 的 Table API 之上,同时在 Runtime 层面有 Python 的算子和执行容器。那么我们聚焦重点,看最底层的 Deploy 部分,上图我们分成了三种部署模式,Local/Cluster
  • 不会java但是想用flink,会python就够了 !PyFlink【一、进入pyflink的世界】
    专栏目标 通过一个代码样例开始使用pyflink通过阅读pyflink的源码,逐步了解flink的python接口实现 本文使用的flink版本和pyflink版本基于1.10.1 初识Flink flink作为当前最流行的流批统一的数据计算处理框架,其开箱即用的部署方式(standalone)对于刚刚接触flink的人来说是非常友好和吸引人的。你可以通过地址找到你想要的版本,也可以直接下载编译好的包来进行下载,当然scala源码包也可以下载 flink的部署非常简单,如果你下载好了,你可以直接切换到解压后的目录下 并执行./bin/start-cluster.sh,默认端口为8080 浏览器打开访问一下试试,因为我这边8080被占用,flink会自动往后使用端口,所以我这边是8081 邂逅PyFlink PyFlink 是什么?这个问题也许会让人感觉问题的答案太明显了,那就是 Flink + Python,也就是 Flink on Python。那么到底 Flink on Python 意味着这什么呢?那么一个非常容易想到的方面就是能够让 Python 用享受到 Flink 的所有功能。其实不仅如此,PyFlink 的存在还有另外一个非常重要的意义就是,Python on Flink,我们可以将 Python 丰富的生态计算能力运行在 Flink 框架之上,这将极大的推动
  • flink-1.11 pyflink 部署文档
    本文主要参考了 孙金城 大佬的下面几篇文章: Apache Flink 说道系列 - PyFlink 作业的多种部署模式 Three Min Series - How to using PyFlink Shell 代码参考:https://github.com/pyflink/playgrounds 结合自己测试过程,有些地方做了修改,做一个记录。 1.从源码编译 flink 如果已经获得 flink 二进制包,可以跳过该步骤。 可以参考官方文档 Build Flink 注意环境要求: = Maven 3.2.5 = Java 8u51 使用下面命令编译: git clone https://github.com/apache/flink mvn clean install -DskipTests 2. 从源码编译 pyflink 如果已经获得 pyflink 二进制包,或者使用 pip 安装,可以跳过该步骤。 参考官方文档 Build PyFlink 需要注意的是,需要在 Python 3.5+ 环境下编译。 注意:需要 Python 3.5+。 进入 flink 源码目录,执行以下命令: cd flink-python; python setup.py sdist bdist_wheel 上面的命令可以生成 sdist 包和 bdist_wheel 包。生成目录为 flink
  • PyFlink 矢量化 UDF 抛出 NullPointerException(PyFlink Vectorized UDF throws NullPointerException)
    问题
  • PyFlink Vectorized UDF throws NullPointerException
    I have a ML model that takes two numpy.ndarray - users and items - and returns an numpy.ndarray predictions. In normal Python code, I would do: model = load_model() df = load_data() # the DataFrame includes 4 columns, namely, user_id, movie_id, rating, and timestamp users = df.user_id.values items = df.movie_id.values predictions = model(users, items) I am looking into porting this code into Flink to leverage its distributed nature. My assumption is: by distributing the prediction workload on multiple Flink nodes, I should be able to run the whole prediction faster. So I compose a PyFlink job
  • PyFlink中使用kafka和MySQL
    PyFlink中使用kafka和MySQL 文章目录 PyFlink中使用kafka和MySQL1 需求配置2 MySQL的安装与配置2.1 配置yum源2.2 安装MySQL源2.3 检查MySQL源是否安装成功2.4 安装MySQL2.5 启动MySQL服务2.6 查看MySQL的状态2.7 查看初始密码2.7 以初始密码登录MySQL2.8 修改root本地登录密码 3 单机系统下的kafka安装与配置3.1 下载kafka压缩包3.2 解压kafka到指定目录3.3 修改server.properties3.4 启动zookeeper3.5 启动kafka3.6 创建topic3.7 删除topic3.8 查看当前系统中所有的topic3.9 启动生产者3.10 启动消费者 4 kafka和MySQL在PyFlink中的使用4.1 环境配置4.2 程序代码4.3 程序运行流程4.4 启动kafka的生产者4.5 遇到的问题 Reference 1 需求配置 系统:Centos Java环境:Java8 Pyflink-1.10.1 kafka_2.13-2.4.0 MySQL 8.0.21 2 MySQL的安装与配置 在PyFlink中使用MySQL,我们要先对MySQL进行安装和配置 2.1 配置yum源 在MySQL官网中下载YUM源rpm安装包: http://dev
  • 一文带你了解最新发布的Apache Flink 1.11.0
    来源 | Apache Flink 官方博客 翻译 | 高赟(云骞) Apache Flink 社区很荣幸的宣布 Flink 1.11.0 版本正式发布!超过 200 名贡献者参与了 Flink 1.11.0 的开发,提交了超过 1300 个修复或优化。这些修改极大的提高了 Flink 的可用性,并且增强了各个 API 栈的功能。其中一些比较重要的修改包括: 核心引擎部分引入了非对齐的 Checkpoint 机制。这一机制是对 Flink 容错机制的一个重要改进,它可以提高严重反压作业的 Checkpoint 速度。 实现了一套新的 Source 接口。通过统一流和批作业 Source 的运行机制,提供常用的内部实现如事件时间处理,watermark 生成和空闲并发检测,这套新的 Source 接口可以极大的降低实现新的 Source 时的开发复杂度。 Flink SQL 引入了对 CDC(Change Data Capture,变动数据捕获)的支持,它使 Flink 可以方便的通过像 Debezium 这类工具来翻译和消费数据库的变动日志。Table API 和 SQL 也扩展了文件系统连接器对更多用户场景和格式的支持,从而可以支持将流式数据从 Kafka 写入 Hive 等场景。 PyFlink 优化了多个部分的性能,包括对向量化的用户自定义函数(Python UDF)的支持
  • 一文教你快速上手PyFlink
    简介:本文介绍了PyFlink项目的目标和发展历程,以及PyFlink目前的核心功能,包括Python Table API、Python UDF、向量化Python UDF、Python UDF Metrics、PyFlink依赖管理和Python UDF执行优化,同时也针对功能展示了相关demo。 作者|付典 本文介绍了PyFlink项目的目标和发展历程,以及PyFlink目前的核心功能,包括Python Table API、Python UDF、向量化Python UDF、Python UDF Metrics、PyFlink依赖管理和Python UDF执行优化,同时也针对功能展示了相关demo。本文主要分为4个部分: PyFlink介绍PyFlink相关功能PyFlink功能演示PyFlink下一步规划 PyFlink介绍 PyFlink是Flink的一个子模块,也是整个Flink项目的一部分,主要目的是提供Flink的Python语言支持。因为在机器学习和数据分析等领域,Python语言非常重要,甚至是最主要的开发语言。所以,为了满足更多用户需求,拓宽Flink的生态,我们启动了PyFlink项目。 PyFlink项目的目标主要有两点,第一点是将Flink的计算能力输出给Python用户,也就是我们会在Flink中提供一系列的Python API
  • 如何为 flink docker 容器配置一些外部 jars 库(how to configure some external jars library to the flink docker container)
    问题 我正在运行具有以下配置的 flink docker 映像。 version: '2.1' services: jobmanager: build: . image: flink volumes: - .:/usr/local/lib/python3.7/site-packages/pyflink/lib hostname: "jobmanager" expose: - "6123" ports: - "8081:8081" command: jobmanager environment: - JOB_MANAGER_RPC_ADDRESS=jobmanager taskmanager: image: flink volumes: - .:/usr/local/lib/python3.7/site-packages/pyflink/lib expose: - "6121" - "6122" depends_on: - jobmanager command: taskmanager links: - jobmanager:jobmanager environment: - JOB_MANAGER_RPC_ADDRESS=jobmanager 运行日志如下 taskmanager_1 | 2020-10-11 10:34:03,714 INFO org.apache.flink
  • how to configure some external jars library to the flink docker container
    I am running a flink docker image with the following configuration. version: '2.1' services: jobmanager: build: . image: flink volumes: - .:/usr/local/lib/python3.7/site-packages/pyflink/lib hostname: "jobmanager" expose: - "6123" ports: - "8081:8081" command: jobmanager environment: - JOB_MANAGER_RPC_ADDRESS=jobmanager taskmanager: image: flink volumes: - .:/usr/local/lib/python3.7/site-packages/pyflink/lib expose: - "6121" - "6122" depends_on: - jobmanager command: taskmanager links: - jobmanager:jobmanager environment: - JOB_MANAGER_RPC_ADDRESS=jobmanager the running log is as below
  • 如何从 0 到 1 开发 PyFlink API 作业
    简介:以 Flink 1.12 为例,介绍如何使用 Python 语言,通过 PyFlink API 来开发 Flink 作业。 Apache Flink 作为当前最流行的流批统一的计算引擎,在实时 ETL、事件处理、数据分析、CEP、实时机器学习等领域都有着广泛的应用。从 Flink 1.9 开始,Apache Flink 社区开始在原有的 Java、Scala、SQL 等编程语言的基础之上,提供对于 Python 语言的支持。经过 Flink 1.9 ~ 1.12 以及即将发布的 1.13 版本的多个版本的开发,目前 PyFlink API 的功能已经日趋完善,可以满足绝大多数情况下 Python 用户的需求。接下来,我们以 Flink 1.12 为例,介绍如何使用 Python 语言,通过 PyFlink API 来开发 Flink 作业。内容包括: 环境准备作业开发作业提交问题排查总结 GitHub 地址https://github.com/apache/flink欢迎大家给 Flink 点赞送 star~ 环境准备 第一步:安装 Python PyFlink 仅支持 Python 3.5+,您首先需要确认您的开发环境是否已安装了 Python 3.5+,如果没有的话,首先需要安装 Python 3.5+。 第二步:安装 JDK 我们知道 Flink 的运行时是使用
  • flink一文全开发周期
    摘要:Apache Flink 作为当前最流行的流批统一的计算引擎,在实时 ETL、事件处理、数据分析、CEP、实时机器学习等领域都有着广泛的应用。从 Flink 1.9 开始,Apache Flink 社区开始在原有的 Java、Scala、SQL 等编程语言的基础之上,提供对于 Python 语言的支持。经过 Flink 1.9 ~ 1.12 以及即将发布的 1.13 版本的多个版本的开发,目前 PyFlink API 的功能已经日趋完善,可以满足绝大多数情况下 Python 用户的需求。接下来,我们以 Flink 1.12 为例,介绍如何使用 Python 语言,通过 PyFlink API 来开发 Flink 作业。内容包括: 环境准备 作业开发 作业提交 问题排查 总结 Tips:点击文末「阅读原文」查看更多技术干货~ GitHub 地址 https://github.com/apache/flink 欢迎大家给 Flink 点赞送 star~ 一、环境准备 第一步:安装 Python PyFlink 仅支持 Python 3.5+,您首先需要确认您的开发环境是否已安装了 Python 3.5+,如果没有的话,需要先安装 Python 3.5+。 第二步:安装 JDK 我们知道 Flink 的运行是使用 Java 语言开发的,所以为了执行 Flink 作业,您还需要安装
  • python连接mysql_Flink 使用python连接mysql
    Flink 使用python连接mysql mysql连接配置依赖包 1.下载flink-connector-jdbc_2.11 jar包 网络路径如下 2.将下载jar包放到/flink-1.11.2/lib下 flink-connector-jdbc_2.11:flink-connector-jdbc_2.11-1.11.2.jar msql的驱动jar包: mysql-connector-java-5.1.47.jar 3.将下载jar包放到/python/site-packages/pyflink/lib下 cp flink-connector-jdbc_2.11-1.11.2.jar /home/hadoop/.local/share/virtualenvs/pycharm_project_305-MIKSrtht/lib/python3.7/site-packages/pyflink/lib cp mysql-connector-java-5.1.47.jar /home/hadoop/.local/share/virtualenvs/pycharm_project_305-MIKSrtht/lib/python3.7/site-packages/pyflink/lib 我们来看个flink连接mysql 的例子 1.准备数据 CREATE TABLE `my