1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
| UserPurchase{ String userId; long timestamp; Item item; } UserItems KeyedProcessFunction(uid="user_items") ValueState<String> userId; ValueState<Long> lastSeenTimestamp; ListState<Item> purchasedItems;
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); ExistingSavepoint existingSavepoint = Savepoint.load(env,"hdfs://path/",new RocksDBStateBackend());
DataSet<UserState> userStates = existingSavepoint.readKeyedState("user_items",new UserKeyedStateReaderFunction()); public class UserKeyedStateReaderFunction extends KeyedStateReaderFunction<String,UserState> { private ValueState<String> userId; private ValueState<Long> lastSeenTimestamp; private ListState<Item> purchasedItems; @Override public void open(Configuration configuration) throws Exception { this.userId = getRuntimeContext.getState(...); this.lastSeenTimestamp = getRuntimeContext.getState(...); this.purchasedItems = getRuntimeContext.getListState(...); } @Override public void readKey(String key,Context cxt,Collector<OUT> out) throws Exception { out.collect(new UserState(userId.get(),lastSeenTimestamp.get(),purchasedItems.get())); } }
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); ExistingSavepoint existingSavepoint = Savepoint.load(env,"hdfs://path/",new RocksDBStateBackend());
DataSet<UserState> userStates = existingSavepoint.readKeyedState("user_items",new UserKeyedStateReaderFunction());
userStates.flatMap() .groupBy("itemCategory") .reduce(new ItemCounter()); env.execute();
ExistingSavepoint existingSavepoint = Savepoint.load(env,"hdfs://path/",new RocksDBStateBackend());
DataSet<UserState> userStates = existingSavepoint.readKeyedState("user_items",new UserKeyedStateReaderFunction());
DataSet<UserState> correctedUserStates = userStates.map(new PurchasedItemCategoryPatcher());
BootstrapTransformation bootstrapTransformation = OperatorTransformation.bootstrapWith(correctedUserStates) .keyBy("userId") .transform(new UserKeyedStateBootstrapFunction());
existingSavepoint.withOperator("user_items",bootstrapTransformation) .write("hdfs://path/for/corrected/savepoint"); env.execute(); public class UserKeyedStateBootstrapFunction extends KeyedStateBootstrapFunction<String,UserState> { private ValueState<String> userId; private ValueState<Long> lastSeenTimestamp; private ListState<Item> purchasedItems; @Override public void open(Configuration parameters) throws Exception { this.userId = getRuntimeContext.getState(...); this.lastSeenTimestamp = getRuntimeContext.getState(...); this.purchasedItems = getRuntimeContext.getListState(...); } @Override public void processElement(UserState userStates,Context cxt) throws Exception { this.userId.update(userStates.userId); this.lastSeenTimestamp(userStates.lastSeenTimestamp); for(Item purchasedItem : userStates.purchasedItems) { this.purchasedItems.add(purchasedItem); } } }
|