度之住事,验之来事,参之平素,可则决之。一一《鬼谷子》
我们来一个Java
程序玩玩,执行以下命令,会自动帮你构建一个包含Flink
依赖的项目:
1
| mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-walkthrough-datastream-java -DarchetypeVersion=1.14.0 -DgroupId=frauddetection -DartifactId=frauddetection -Dversion=0.1 -Dpackage=spendreport -DinteractiveMode=false
|
执行完毕后
打开项目
将这两处的<scope>provided</scope>
注释掉或者删掉也行
简单运行一下,可以看到我们控制台一直输出执行日志
这就是官方提供的简单Demo
,运行了一下,我试着依葫芦画瓢自己写了一个
按照我看过的这篇文章
实现了一个咖啡馆实时积分,送奖品的逻辑
假设,你把“世界这么大,老子去看看”的辞职信甩到了老板脸上,回老家科尔沁草原正中央开了一家咖啡馆,名叫“马屎咖啡”。
创业不易,需要绞尽脑汁吸引顾客。于是你搞了一个促销活动,一杯咖啡积1分,每个顾客只要积够10分,就白送一次骑马体验。
你店里卖的东西种类越来越多:10种咖啡,8种点心,还有6种正餐。
与此同时,你的促销活动也变得更复杂:一份咖啡积1分,一份点心积0.5分,一份正餐积2分。
一周内积够10分才送骑马,七天之前的过期积分就作废。
于是你做了第三个改进:补充一个实时计算员。
这里具体使用flink
实现:
第一行的 StreamExecutionEnvironment
用于设置你的执行环境。 任务执行环境用于定义任务的属性、创建数据源以及最终启动任务的执行。
1 2
| StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
|
数据源从外部系统例如 Apache Kafka、Rabbit MQ
或者 Apache Pulsar
接收数据,然后将数据送到 Flink
程序中。 这个代码练习使用的是一个能够无限循环生成订单模拟交易数据的数据源。 每条交易数据包括了 所属用户(username
),交易发生的时间 (createTime
) 以及货物(goods
)。 绑定到数据源上的 name
属性是为了调试方便,如果发生一些异常,我们能够通过它快速定位问题发生在哪里。
1
| DataStream<Order> orders = env.addSource(new OrderSource()).name("orders");
|
OrderSource
中构造了一个new RateLimitedIterator<>(Customers.unbounded())
传入父构造器
并且在next
中sleep
了100
毫秒,给程序一个缓冲时间
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| import org.apache.flink.annotation.Public; import org.apache.flink.streaming.api.functions.source.FromIteratorFunction;
import java.io.Serializable; import java.util.Iterator;
@Public public class OrderSource extends FromIteratorFunction<Order> { private static final long serialVersionUID = -3541830323543628337L;
public OrderSource() { super(new RateLimitedIterator<>(Customers.unbounded())); }
private static class RateLimitedIterator<T> implements Iterator<T>, Serializable {
private static final long serialVersionUID = 1L;
private final Iterator<T> inner;
private RateLimitedIterator(Iterator<T> inner) { this.inner = inner; }
@Override public boolean hasNext() { return inner.hasNext(); }
@Override public T next() { try { Thread.sleep(100); } catch (InterruptedException e) { throw new RuntimeException(e); } return inner.next(); } } }
|
这里Customers.unbounded()
是一个Iterator
迭代器,里面模拟了用户点单情况
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
| import cn.hutool.core.util.RandomUtil; import com.github.javafaker.Faker;
import java.io.Serializable; import java.util.Arrays; import java.util.Iterator; import java.util.List;
public class Customers implements Iterator<Order>, Serializable { private static final long serialVersionUID = 6202217638794440645L;
private static final List<Goods> GOODS_LIST = Arrays.asList( Goods.builder().name("卡布奇诺").goodsType(GoodsTypeEnum.COFFEE).money(35.0).build() , Goods.builder().name("卡布奇诺中杯").goodsType(GoodsTypeEnum.COFFEE).money(37.0).build() , Goods.builder().name("卡布奇诺大杯").goodsType(GoodsTypeEnum.COFFEE).money(39.0).build() , Goods.builder().name("拿铁").goodsType(GoodsTypeEnum.COFFEE).money(30.0).build() , Goods.builder().name("美式").goodsType(GoodsTypeEnum.COFFEE).money(28.0).build() , Goods.builder().name("星冰乐").goodsType(GoodsTypeEnum.COFFEE).money(30.0).build() , Goods.builder().name("焦糖玛奇朵").goodsType(GoodsTypeEnum.COFFEE).money(32.0).build() , Goods.builder().name("华夫饼").goodsType(GoodsTypeEnum.DESSERT).money(19.5).build() , Goods.builder().name("草莓蛋糕").goodsType(GoodsTypeEnum.DESSERT).money(35.5).build() , Goods.builder().name("菠萝包").goodsType(GoodsTypeEnum.DESSERT).money(16.5).build() , Goods.builder().name("法可颂").goodsType(GoodsTypeEnum.DESSERT).money(8.5).build() , Goods.builder().name("意面").goodsType(GoodsTypeEnum.MEAL).money(30.0).build() , Goods.builder().name("意面").goodsType(GoodsTypeEnum.MEAL).money(30.0).build() , Goods.builder().name("水果蔬菜沙拉").goodsType(GoodsTypeEnum.MEAL).money(40.0).build() , Goods.builder().name("鸡肉卷").goodsType(GoodsTypeEnum.MEAL).money(35.0).build() , Goods.builder().name("牛肉卷").goodsType(GoodsTypeEnum.MEAL).money(35.0).build() , Goods.builder().name("羊肉卷").goodsType(GoodsTypeEnum.MEAL).money(35.0).build() , Goods.builder().name("鱼肉卷").goodsType(GoodsTypeEnum.MEAL).money(35.0).build() );
public static Iterator<Order> unbounded() { return new Customers(); }
@Override public boolean hasNext() { return true; }
@Override public Order next() { return Order.builder().username(Faker.instance().name().firstName()).id(RandomUtil.randomLong(0, Long.MAX_VALUE)).goods(RandomUtil.randomEles(GOODS_LIST, RandomUtil.randomInt(1, 10))).createTime(RandomUtil.randomDay(-7, 0)).build(); }
}
|
orders
这个数据流包含了大量的用户交易数据,需要被划分到多个并发上进行计分处理。由于计分行为的发生是基于某一个用户的,所以,必须要要保证同一个账户的所有交易行为数据要被同一个并发的 task 进行处理。
为了保证同一个 task 处理同一个 key 的所有数据,你可以使用 DataStream#keyBy
对流进行分区。 process()
函数对流绑定了一个操作,这个操作将会对流上的每一个消息调用所定义好的函数。 通常,一个操作会紧跟着 keyBy
被调用,在这个例子中,这个操作是Calculator
,该操作是在一个 keyed context 上执行的。
简单来说,下面这段代码,用username
去隔离用户之间的数据,用Calculator
去执行具体操作
1
| DataStream<HorseTicket> alerts = orders.keyBy(Order::getUsername).process(new Calculator()).name("calculator");
|
我们细看Calculator
,它是我们具体计分逻辑的实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
| import cn.hutool.core.collection.CollUtil; import cn.hutool.core.date.DateUnit; import cn.hutool.core.date.DateUtil; import cn.hutool.core.lang.Console; import cn.hutool.core.lang.Opt; import cn.hutool.core.util.NumberUtil; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.util.Collector;
import java.io.IOException; import java.util.Arrays; import java.util.List;
public class Calculator extends KeyedProcessFunction<String, Order, HorseTicket> { public static final int NUMBER_LIMIT = 10; public static final int WEEK_DAY_LIMIT = 7; private static final long serialVersionUID = -6248614038880488977L; private transient ValueState<List<Score>> scoreBoard;
@Override public void open(Configuration parameters) throws Exception { ValueStateDescriptor<List<Score>> scoreDescriptor = new ValueStateDescriptor<>( "scoreboard", Types.LIST(TypeInformation.of(Score.class))); scoreBoard = getRuntimeContext().getState(scoreDescriptor); }
@Override public void processElement(Order order, Context ctx, Collector<HorseTicket> out) throws Exception { List<Score> scores = Opt.ofNullable(this.scoreBoard.value()).orElseGet(CollUtil::newArrayList); Console.log(order); Double score = order.getGoods().parallelStream().map(Goods::getGoodsType).map(GoodsTypeEnum::getScore).reduce((double) 0, NumberUtil::add, NumberUtil::add); Console.log("{},Your score is {} at {}.", order.getUsername(), score, DateUtil.formatDateTime(order.getCreateTime()));
scores.add(Score.builder().score(score).time(order.getCreateTime()).build()); if (scores.parallelStream().filter(s -> DateUtil.between(s.getTime(), DateUtil.date(), DateUnit.DAY) <= WEEK_DAY_LIMIT).map(Score::getScore).reduce((double) 0, NumberUtil::add, NumberUtil::add) >= NUMBER_LIMIT) { driveHorse(out, order, scores.toArray(new Score[0])); return; }
this.scoreBoard.update(scores); }
private void driveHorse(Collector<HorseTicket> out, Order order, Score... scores) throws IOException { out.collect(HorseTicket.builder().username(order.getUsername()).scores(Arrays.asList(scores)).build()); this.scoreBoard.clear(); }
}
|
然后sink
会将 DataStream
写出到外部系统,例如 Apache Kafka、Cassandra 或者 AWS Kinesis 等。 Racecourse
使用 ERROR 的日志级别打印每一个 HorseTicket
的数据记录,而不是将其写入持久存储,以便你可以方便地查看结果。
1 2
| alerts.addSink(new Racecourse()).name("racecourse");
|
Racecourse
具体实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| import cn.hutool.core.lang.Console; import org.apache.flink.streaming.api.functions.sink.SinkFunction;
public class Racecourse implements SinkFunction<HorseTicket> { private static final long serialVersionUID = 7327454374125251434L;
@Override public void invoke(HorseTicket horseTicket, Context context) throws Exception { Console.error("{} is driving horse!And his scores are {}.", horseTicket.getUsername(), horseTicket.getScores()); } }
|
Flink
程序是懒加载的,并且只有在完全搭建好之后,才能够发布到集群上执行。 调用 StreamExecutionEnvironment#execute
时给任务传递一个任务名参数,就可以开始运行任务。
1 2
| env.execute("Horse Shit Cafe");
|
完整代码我上传到了:
https://gitee.com/VampireAchao/simple-flink.git
执行后可以看到很多用户下单:
例如这位Stacey
第一次订单积分6
分
第二次积分4
分刚好10
分,并且两笔订单相差不超过一周,送去骑马体验