您现在的位置是:首页 > 编程 > 

Flink 实践练习

2025-07-27 21:38:32
Flink 实践练习 将 flink-training 项目导入到vscode中在vscode中使用git方法代码语言:bash复制git clone .git安装gradle插件这个和maven是类似的,但是更新,实现的功能更多也更简洁构建项目代码语言:bash复制./gradlew test shadowJar尝试实现筛选的的过滤方法每个项目里有exercise和solution,soluti

Flink 实践练习

将 flink-training 项目导入到vscode中

在vscode中使用git方法

代码语言:bash复制
git clone .git
安装gradle插件

这个和maven是类似的,但是更新,实现的功能更多也更简洁

构建项目代码语言:bash复制
./gradlew test shadowJar
尝试实现筛选的的过滤方法

每个项目里有exercise和solution,solution是已经实现的方法。

代码语言:java复制
package org.apache.soluti.ridecleansing;

import org.apache.flink.apimon.JobExecutionResult;
import org.apache.flink.apimon.functi.FilterFunction;
import org.apache.flink.streaming.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functi.sink.PrintSinkFunction;
import org.apache.flink.streaming.api.functi.sink.SinkFunction;
import org.apache.flink.streaming.api.functi.source.SourceFunction;
import org.apache.exercisesmon.datatypes.TaxiRide;
import org.apache.exercisesmon.sources.TaxiRideGenerator;
import org.apache.exercisesmon.utils.GeoUtils;

/**
 * Solution to the Ride Cleansing exercise from the Flink training.
 *
 * <p>The task of this exercise is to filter a data stream of taxi ride records to keep only rides
 * that both start and end within ew York City. The resulting stream should be printed.
 */
public class RideCleansingSolution {

    private final SourceFunction<TaxiRide> source;
    private final SinkFunction<TaxiRide> sink;

    /** Creates a job using the source and sink provided. */
    public RideCleansingSolution(SourceFunction<TaxiRide> source, SinkFunction<TaxiRide> sink) {

        this.source = source;
        this.sink = sink;
    }

    /**
     * Main method.
     *
     * @throws Exception which occurs during job execution.
     */
    public static void main(String[] args) throws Exception {
        RideCleansingSolution job =
                new RideCleansingSolution(new TaxiRideGenerator(), new PrintSinkFunction<>());

        ();
    }

    /**
     * Creates and executes the long rides pipeline.
     *
     * @return {JobExecutionResult}
     * @throws Exception which occurs during job execution.
     */
    public JobExecutionResult execute() throws Exception {

        // set up streaming execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // set up the pipeline
        env.addSource(source).filter(new YCFilter()).addSink(sink);

        // run the pipeline and return the result
        return ("Taxi Ride Cleansing");
    }

    /** Keep only those rides and both start and end in YC. */
    public static class YCFilter implements FilterFunction<TaxiRide> {
        @Override
        public boolean filter(TaxiRide taxiRide) {
            return GeoUtils.isInYC(taxiRide.startLon, taxiRide.startLat)
                    && GeoUtils.isInYC(, );
        }
    }
}

从代码中可以看到,source 和 sink已经实现,框架也已经实现好,需要做的只是实现fileter方法

也就是这里的内容

代码语言:java复制
    /** Keep only those rides and both start and end in YC. */
    public static class YCFilter implements FilterFunction<TaxiRide> {
        @Override
        public boolean filter(TaxiRide taxiRide) {
            return GeoUtils.isInYC(taxiRide.startLon, taxiRide.startLat)
                    && GeoUtils.isInYC(, );
        }
    }
}

#感谢您对电脑配置推荐网 - 最新i3 i5 i7组装电脑配置单推荐报价格的认可,转载请说明来源于"电脑配置推荐网 - 最新i3 i5 i7组装电脑配置单推荐报价格

本文地址:http://www.dnpztj.cn/biancheng/1217591.html

相关标签:无
上传时间: 2025-07-25 09:34:11
留言与评论(共有 6 条评论)
本站网友 预警机排名
11分钟前 发表
taxiRide.startLat) && GeoUtils.isInYC(
本站网友 石房网
14分钟前 发表
框架也已经实现好
本站网友 黄建翔
8分钟前 发表
需要做的只是实现fileter方法也就是这里的内容代码语言:java复制 /** Keep only those rides and both start and end in YC. */ public static class YCFilter implements FilterFunction<TaxiRide> { @Override public boolean filter(TaxiRide taxiRide) { return GeoUtils.isInYC(taxiRide.startLon
本站网友 泰安中雅快餐
24分钟前 发表
new PrintSinkFunction<>()); (); } /** * Creates and executes the long rides pipeline. * * @return {JobExecutionResult} * @throws Exception which occurs during job execution. */ public JobExecutionResult execute() throws Exception { // set up streaming execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // set up the pipeline env.addSource(source).filter(new YCFilter()).addSink(sink); // run the pipeline and return the result return ("Taxi Ride Cleansing"); } /** Keep only those rides and both start and end in YC. */ public static class YCFilter implements FilterFunction<TaxiRide> { @Override public boolean filter(TaxiRide taxiRide) { return GeoUtils.isInYC(taxiRide.startLon
本站网友 上海日本料理自助餐
3分钟前 发表
taxiRide.startLat) && GeoUtils.isInYC(