Flink 实践练习
Flink 实践练习
将 flink-training 项目导入到vscode中在vscode中使用git方法代码语言:bash复制git clone .git安装gradle插件这个和maven是类似的,但是更新,实现的功能更多也更简洁构建项目代码语言:bash复制./gradlew test shadowJar尝试实现筛选的的过滤方法每个项目里有exercise和solution,soluti
Flink 实践练习
在vscode中使用git方法
代码语言:bash复制git clone .git
这个和maven是类似的,但是更新,实现的功能更多也更简洁
./gradlew test shadowJar
每个项目里有exercise和solution,solution是已经实现的方法。
代码语言:java复制package org.apache.soluti.ridecleansing;
import org.apache.flink.apimon.JobExecutionResult;
import org.apache.flink.apimon.functi.FilterFunction;
import org.apache.flink.streaming.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functi.sink.PrintSinkFunction;
import org.apache.flink.streaming.api.functi.sink.SinkFunction;
import org.apache.flink.streaming.api.functi.source.SourceFunction;
import org.apache.exercisesmon.datatypes.TaxiRide;
import org.apache.exercisesmon.sources.TaxiRideGenerator;
import org.apache.exercisesmon.utils.GeoUtils;
/**
* Solution to the Ride Cleansing exercise from the Flink training.
*
* <p>The task of this exercise is to filter a data stream of taxi ride records to keep only rides
* that both start and end within ew York City. The resulting stream should be printed.
*/
public class RideCleansingSolution {
private final SourceFunction<TaxiRide> source;
private final SinkFunction<TaxiRide> sink;
/** Creates a job using the source and sink provided. */
public RideCleansingSolution(SourceFunction<TaxiRide> source, SinkFunction<TaxiRide> sink) {
this.source = source;
this.sink = sink;
}
/**
* Main method.
*
* @throws Exception which occurs during job execution.
*/
public static void main(String[] args) throws Exception {
RideCleansingSolution job =
new RideCleansingSolution(new TaxiRideGenerator(), new PrintSinkFunction<>());
();
}
/**
* Creates and executes the long rides pipeline.
*
* @return {JobExecutionResult}
* @throws Exception which occurs during job execution.
*/
public JobExecutionResult execute() throws Exception {
// set up streaming execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// set up the pipeline
env.addSource(source).filter(new YCFilter()).addSink(sink);
// run the pipeline and return the result
return ("Taxi Ride Cleansing");
}
/** Keep only those rides and both start and end in YC. */
public static class YCFilter implements FilterFunction<TaxiRide> {
@Override
public boolean filter(TaxiRide taxiRide) {
return GeoUtils.isInYC(taxiRide.startLon, taxiRide.startLat)
&& GeoUtils.isInYC(, );
}
}
}
从代码中可以看到,source 和 sink已经实现,框架也已经实现好,需要做的只是实现fileter方法
也就是这里的内容
代码语言:java复制 /** Keep only those rides and both start and end in YC. */
public static class YCFilter implements FilterFunction<TaxiRide> {
@Override
public boolean filter(TaxiRide taxiRide) {
return GeoUtils.isInYC(taxiRide.startLon, taxiRide.startLat)
&& GeoUtils.isInYC(, );
}
}
}
#感谢您对电脑配置推荐网 - 最新i3 i5 i7组装电脑配置单推荐报价格的认可,转载请说明来源于"电脑配置推荐网 - 最新i3 i5 i7组装电脑配置单推荐报价格
上传时间: 2025-07-25 09:34:11
推荐阅读
留言与评论(共有 6 条评论) |
本站网友 预警机排名 | 11分钟前 发表 |
taxiRide.startLat) && GeoUtils.isInYC( | |
本站网友 石房网 | 14分钟前 发表 |
框架也已经实现好 | |
本站网友 黄建翔 | 8分钟前 发表 |
需要做的只是实现fileter方法也就是这里的内容代码语言:java复制 /** Keep only those rides and both start and end in YC. */ public static class YCFilter implements FilterFunction<TaxiRide> { @Override public boolean filter(TaxiRide taxiRide) { return GeoUtils.isInYC(taxiRide.startLon | |
本站网友 泰安中雅快餐 | 24分钟前 发表 |
new PrintSinkFunction<>()); (); } /** * Creates and executes the long rides pipeline. * * @return {JobExecutionResult} * @throws Exception which occurs during job execution. */ public JobExecutionResult execute() throws Exception { // set up streaming execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // set up the pipeline env.addSource(source).filter(new YCFilter()).addSink(sink); // run the pipeline and return the result return ("Taxi Ride Cleansing"); } /** Keep only those rides and both start and end in YC. */ public static class YCFilter implements FilterFunction<TaxiRide> { @Override public boolean filter(TaxiRide taxiRide) { return GeoUtils.isInYC(taxiRide.startLon | |
本站网友 上海日本料理自助餐 | 3分钟前 发表 |
taxiRide.startLat) && GeoUtils.isInYC( |