generated from databricks-industry-solutions/industry-solutions-blueprints
-
Notifications
You must be signed in to change notification settings - Fork 12
/
Copy path1_Python_Fine Grained Demand Forecasting.py
570 lines (439 loc) · 20.6 KB
/
1_Python_Fine Grained Demand Forecasting.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
# Databricks notebook source
# MAGIC %md
# MAGIC You may find this series of notebooks at https://github.com/databricks-industry-solutions/fine-grained-demand-forecasting. For more information about this solution accelerator, visit https://www.databricks.com/solutions/accelerators/demand-forecasting.
# COMMAND ----------
# MAGIC %md The objective of this notebook is to illustrate how we might generate a large number of fine-grained forecasts at the store-item level in an efficient manner leveraging the distributed computational power of Databricks. This is a Spark 3.x update to a previously published notebook which had been developed for Spark 2.x. **UPDATE** marks in this notebook indicate changes in the code intended to reflect new functionality in either Spark 3.x or the Databricks platform.
# COMMAND ----------
# MAGIC %md
# MAGIC
# MAGIC For this exercise, we will make use of an increasingly popular library for demand forecasting, [prophet](https://facebook.github.io/prophet/), which we will load into the notebook session using the %pip magic command.
# COMMAND ----------
# DBTITLE 1,Install Required Libraries
# MAGIC %pip install prophet
# COMMAND ----------
# MAGIC %md ## Step 1: Examine the Data
# MAGIC
# MAGIC For our training dataset, we will make use of 5-years of store-item unit sales data for 50 items across 10 different stores. This data set is publicly available as part of a past Kaggle competition and can be downloaded with the `./config/Data Extract` notebook with your own Kaggle credentials.
# MAGIC
# MAGIC Once downloaded, we can unzip the *train.csv.zip* file and upload the decompressed CSV to */FileStore/demand_forecast/train/* using the file import steps documented [here](https://docs.databricks.com/data/databricks-file-system.html#!#user-interface). With the dataset accessible within Databricks, we can now explore it in preparation for modeling:
# COMMAND ----------
# MAGIC %run "./config/Data Extract"
# COMMAND ----------
# DBTITLE 1,Access the Dataset
from pyspark.sql.types import *
# structure of the training data set
train_schema = StructType([
StructField('date', DateType()),
StructField('store', IntegerType()),
StructField('item', IntegerType()),
StructField('sales', IntegerType())
])
# read the training file into a dataframe
train = spark.read.csv(
'/tmp/solacc/demand_forecast/train/train.csv',
header=True,
schema=train_schema
)
# make the dataframe queryable as a temporary view
train.createOrReplaceTempView('train')
# show data
display(train)
# COMMAND ----------
# MAGIC %md When performing demand forecasting, we are often interested in general trends and seasonality. Let's start our exploration by examining the annual trend in unit sales:
# COMMAND ----------
# DBTITLE 1,View Yearly Trends
# MAGIC %sql
# MAGIC
# MAGIC SELECT
# MAGIC year(date) as year,
# MAGIC sum(sales) as sales
# MAGIC FROM train
# MAGIC GROUP BY year(date)
# MAGIC ORDER BY year;
# COMMAND ----------
# MAGIC %md It's very clear from the data that there is a generally upward trend in total unit sales across the stores. If we had better knowledge of the markets served by these stores, we might wish to identify whether there is a maximum growth capacity we'd expect to approach over the life of our forecast. But without that knowledge and by just quickly eyeballing this dataset, it feels safe to assume that if our goal is to make a forecast a few days, months or even a year out, we might expect continued linear growth over that time span.
# MAGIC
# MAGIC Now let's examine seasonality. If we aggregate the data around the individual months in each year, a distinct yearly seasonal pattern is observed which seems to grow in scale with overall growth in sales:
# COMMAND ----------
# DBTITLE 1,View Monthly Trends
# MAGIC %sql
# MAGIC
# MAGIC SELECT
# MAGIC TRUNC(date, 'MM') as month,
# MAGIC SUM(sales) as sales
# MAGIC FROM train
# MAGIC GROUP BY TRUNC(date, 'MM')
# MAGIC ORDER BY month;
# COMMAND ----------
# MAGIC %md Aggregating the data at a weekday level, a pronounced weekly seasonal pattern is observed with a peak on Sunday (weekday 0), a hard drop on Monday (weekday 1), and then a steady pickup over the week heading back to the Sunday high. This pattern seems to be pretty stable across the five years of observations:
# MAGIC
# MAGIC **UPDATE** As part of the Spark 3 move to the [Proleptic Gregorian calendar](https://databricks.com/blog/2020/07/22/a-comprehensive-look-at-dates-and-timestamps-in-apache-spark-3-0.html), the 'u' option in CAST(DATE_FORMAT(date, 'u') was removed. We are now using 'E to provide us a similar output.
# COMMAND ----------
# DBTITLE 1,View Weekday Trends
# MAGIC %sql
# MAGIC
# MAGIC SELECT
# MAGIC YEAR(date) as year,
# MAGIC (
# MAGIC CASE
# MAGIC WHEN DATE_FORMAT(date, 'E') = 'Sun' THEN 0
# MAGIC WHEN DATE_FORMAT(date, 'E') = 'Mon' THEN 1
# MAGIC WHEN DATE_FORMAT(date, 'E') = 'Tue' THEN 2
# MAGIC WHEN DATE_FORMAT(date, 'E') = 'Wed' THEN 3
# MAGIC WHEN DATE_FORMAT(date, 'E') = 'Thu' THEN 4
# MAGIC WHEN DATE_FORMAT(date, 'E') = 'Fri' THEN 5
# MAGIC WHEN DATE_FORMAT(date, 'E') = 'Sat' THEN 6
# MAGIC END
# MAGIC ) % 7 as weekday,
# MAGIC AVG(sales) as sales
# MAGIC FROM (
# MAGIC SELECT
# MAGIC date,
# MAGIC SUM(sales) as sales
# MAGIC FROM train
# MAGIC GROUP BY date
# MAGIC ) x
# MAGIC GROUP BY year, weekday
# MAGIC ORDER BY year, weekday;
# COMMAND ----------
# MAGIC %md Now that we are oriented to the basic patterns within our data, let's explore how we might build a forecast.
# COMMAND ----------
# MAGIC %md ## Step 2: Build a Single Forecast
# MAGIC
# MAGIC Before attempting to generate forecasts for individual combinations of stores and items, it might be helpful to build a single forecast for no other reason than to orient ourselves to the use of prophet.
# MAGIC
# MAGIC Our first step is to assemble the historical dataset on which we will train the model:
# COMMAND ----------
# DBTITLE 1,Retrieve Data for a Single Item-Store Combination
# query to aggregate data to date (ds) level
sql_statement = '''
SELECT
CAST(date as date) as ds,
sales as y
FROM train
WHERE store=1 AND item=1
ORDER BY ds
'''
# assemble dataset in Pandas dataframe
history_pd = spark.sql(sql_statement).toPandas()
# drop any missing records
history_pd = history_pd.dropna()
# COMMAND ----------
# MAGIC %md Now, we will import the prophet library, but because it can be a bit verbose when in use, we will need to fine-tune the logging settings in our environment:
# COMMAND ----------
# DBTITLE 1,Import Prophet Library
from prophet import Prophet
import logging
# disable informational messages from prophet
logging.getLogger('py4j').setLevel(logging.ERROR)
# COMMAND ----------
# MAGIC %md Based on our review of the data, it looks like we should set our overall growth pattern to linear and enable the evaluation of weekly and yearly seasonal patterns. We might also wish to set our seasonality mode to multiplicative as the seasonal pattern seems to grow with overall growth in sales:
# COMMAND ----------
# DBTITLE 1,Train Prophet Model
# set model parameters
model = Prophet(
interval_width=0.95,
growth='linear',
daily_seasonality=False,
weekly_seasonality=True,
yearly_seasonality=True,
seasonality_mode='multiplicative'
)
# fit the model to historical data
model.fit(history_pd)
# COMMAND ----------
# MAGIC %md Now that we have a trained model, let's use it to build a 90-day forecast:
# COMMAND ----------
# DBTITLE 1,Build Forecast
# define a dataset including both historical dates & 90-days beyond the last available date
future_pd = model.make_future_dataframe(
periods=90,
freq='d',
include_history=True
)
# predict over the dataset
forecast_pd = model.predict(future_pd)
display(forecast_pd)
# COMMAND ----------
# MAGIC %md How did our model perform? Here we can see the general and seasonal trends in our model presented as graphs:
# COMMAND ----------
# DBTITLE 1,Examine Forecast Components
trends_fig = model.plot_components(forecast_pd)
display(trends_fig)
# COMMAND ----------
# MAGIC %md And here, we can see how our actual and predicted data line up as well as a forecast for the future, though we will limit our graph to the last year of historical data just to keep it readable:
# COMMAND ----------
# DBTITLE 1,View Historicals vs. Predictions
predict_fig = model.plot( forecast_pd, xlabel='date', ylabel='sales')
# adjust figure to display dates from last year + the 90 day forecast
xlim = predict_fig.axes[0].get_xlim()
new_xlim = ( xlim[1]-(180.0+365.0), xlim[1]-90.0)
predict_fig.axes[0].set_xlim(new_xlim)
display(predict_fig)
# COMMAND ----------
# MAGIC %md **NOTE** This visualization is a bit busy. Bartosz Mikulski provides [an excellent breakdown](https://www.mikulskibartosz.name/prophet-plot-explained/) of it that is well worth checking out. In a nutshell, the black dots represent our actuals with the darker blue line representing our predictions and the lighter blue band representing our (95%) uncertainty interval.
# COMMAND ----------
# MAGIC %md Visual inspection is useful, but a better way to evaluate the forecast is to calculate Mean Absolute Error, Mean Squared Error and Root Mean Squared Error values for the predicted relative to the actual values in our set:
# MAGIC
# MAGIC **UPDATE** A change in pandas functionality requires us to use *pd.to_datetime* to coerce the date string into the right data type.
# COMMAND ----------
# DBTITLE 1,Calculate Evaluation metrics
import pandas as pd
from sklearn.metrics import mean_squared_error, mean_absolute_error
from math import sqrt
from datetime import date
# get historical actuals & predictions for comparison
actuals_pd = history_pd[ history_pd['ds'] < date(2018, 1, 1) ]['y']
predicted_pd = forecast_pd[ forecast_pd['ds'] < pd.to_datetime('2018-01-01') ]['yhat']
# calculate evaluation metrics
mae = mean_absolute_error(actuals_pd, predicted_pd)
mse = mean_squared_error(actuals_pd, predicted_pd)
rmse = sqrt(mse)
# print metrics to the screen
print( '\n'.join(['MAE: {0}', 'MSE: {1}', 'RMSE: {2}']).format(mae, mse, rmse) )
# COMMAND ----------
# MAGIC %md prophet provides [additional means](https://facebook.github.io/prophet/docs/diagnostics.html) for evaluating how your forecasts hold up over time. You're strongly encouraged to consider using these and those additional techniques when building your forecast models but we'll skip this here to focus on the scaling challenge.
# COMMAND ----------
# MAGIC %md ## Step 3: Scale Forecast Generation
# MAGIC
# MAGIC With the mechanics under our belt, let's now tackle our original goal of building numerous, fine-grain models & forecasts for individual store and item combinations. We will start by assembling sales data at the store-item-date level of granularity:
# MAGIC
# MAGIC **NOTE**: The data in this data set should already be aggregated at this level of granularity but we are explicitly aggregating to ensure we have the expected data structure.
# COMMAND ----------
# DBTITLE 1,Retrieve Data for All Store-Item Combinations
sql_statement = '''
SELECT
store,
item,
CAST(date as date) as ds,
SUM(sales) as y
FROM train
GROUP BY store, item, ds
ORDER BY store, item, ds
'''
store_item_history = (
spark
.sql( sql_statement )
.repartition(sc.defaultParallelism, ['store', 'item'])
).cache()
# COMMAND ----------
# MAGIC %md With our data aggregated at the store-item-date level, we need to consider how we will pass our data to prophet. If our goal is to build a model for each store and item combination, we will need to pass in a store-item subset from the dataset we just assembled, train a model on that subset, and receive a store-item forecast back. We'd expect that forecast to be returned as a dataset with a structure like this where we retain the store and item identifiers for which the forecast was assembled and we limit the output to just the relevant subset of fields generated by the Prophet model:
# COMMAND ----------
# DBTITLE 1,Define Schema for Forecast Output
from pyspark.sql.types import *
result_schema =StructType([
StructField('ds',DateType()),
StructField('store',IntegerType()),
StructField('item',IntegerType()),
StructField('y',FloatType()),
StructField('yhat',FloatType()),
StructField('yhat_upper',FloatType()),
StructField('yhat_lower',FloatType())
])
# COMMAND ----------
# MAGIC %md To train the model and generate a forecast we will leverage a Pandas function. We will define this function to receive a subset of data organized around a store and item combination. It will return a forecast in the format identified in the previous cell:
# MAGIC
# MAGIC **UPDATE** With Spark 3.0, pandas functions replace the functionality found in pandas UDFs. The deprecated pandas UDF syntax is still supported but will be phased out over time. For more information on the new, streamlined pandas functions API, please refer to [this document](https://databricks.com/blog/2020/05/20/new-pandas-udfs-and-python-type-hints-in-the-upcoming-release-of-apache-spark-3-0.html).
# COMMAND ----------
# DBTITLE 1,Define Function to Train Model & Generate Forecast
def forecast_store_item( history_pd: pd.DataFrame ) -> pd.DataFrame:
# TRAIN MODEL AS BEFORE
# --------------------------------------
# remove missing values (more likely at day-store-item level)
history_pd = history_pd.dropna()
# configure the model
model = Prophet(
interval_width=0.95,
growth='linear',
daily_seasonality=False,
weekly_seasonality=True,
yearly_seasonality=True,
seasonality_mode='multiplicative'
)
# train the model
model.fit( history_pd )
# --------------------------------------
# BUILD FORECAST AS BEFORE
# --------------------------------------
# make predictions
future_pd = model.make_future_dataframe(
periods=90,
freq='d',
include_history=True
)
forecast_pd = model.predict( future_pd )
# --------------------------------------
# ASSEMBLE EXPECTED RESULT SET
# --------------------------------------
# get relevant fields from forecast
f_pd = forecast_pd[ ['ds','yhat', 'yhat_upper', 'yhat_lower'] ].set_index('ds')
# get relevant fields from history
h_pd = history_pd[['ds','store','item','y']].set_index('ds')
# join history and forecast
results_pd = f_pd.join( h_pd, how='left' )
results_pd.reset_index(level=0, inplace=True)
# get store & item from incoming data set
results_pd['store'] = history_pd['store'].iloc[0]
results_pd['item'] = history_pd['item'].iloc[0]
# --------------------------------------
# return expected dataset
return results_pd[ ['ds', 'store', 'item', 'y', 'yhat', 'yhat_upper', 'yhat_lower'] ]
# COMMAND ----------
# MAGIC %md There's a lot taking place within our function, but if you compare the first two blocks of code within which the model is being trained and a forecast is being built to the cells in the previous portion of this notebook, you'll see the code is pretty much the same as before. It's only in the assembly of the required result set that truly new code is being introduced and it consists of fairly standard Pandas dataframe manipulations.
# COMMAND ----------
# MAGIC %md Now let's call our pandas function to build our forecasts. We do this by grouping our historical dataset around store and item. We then apply our function to each group and tack on today's date as our *training_date* for data management purposes:
# MAGIC
# MAGIC **UPDATE** Per the previous update note, we are now using applyInPandas() to call a pandas function instead of a pandas UDF.
# COMMAND ----------
# DBTITLE 1,Apply Forecast Function to Each Store-Item Combination
from pyspark.sql.functions import current_date
results = (
store_item_history
.groupBy('store', 'item')
.applyInPandas(forecast_store_item, schema=result_schema)
.withColumn('training_date', current_date() )
)
results.createOrReplaceTempView('new_forecasts')
display(results)
# COMMAND ----------
# MAGIC %md We we are likely wanting to report on our forecasts, so let's save them to a queryable table structure:
# COMMAND ----------
# DBTITLE 1,Persist Forecast Output
# MAGIC %sql
# MAGIC -- create forecast table
# MAGIC create table if not exists forecasts (
# MAGIC date date,
# MAGIC store integer,
# MAGIC item integer,
# MAGIC sales float,
# MAGIC sales_predicted float,
# MAGIC sales_predicted_upper float,
# MAGIC sales_predicted_lower float,
# MAGIC training_date date
# MAGIC )
# MAGIC using delta
# MAGIC partitioned by (date);
# MAGIC
# MAGIC -- load data to it
# MAGIC merge into forecasts f
# MAGIC using new_forecasts n
# MAGIC on f.date = n.ds and f.store = n.store and f.item = n.item
# MAGIC when matched then update set f.date = n.ds,
# MAGIC f.store = n.store,
# MAGIC f.item = n.item,
# MAGIC f.sales = n.y,
# MAGIC f.sales_predicted = n.yhat,
# MAGIC f.sales_predicted_upper = n.yhat_upper,
# MAGIC f.sales_predicted_lower = n.yhat_lower,
# MAGIC f.training_date = n.training_date
# MAGIC when not matched then insert (date,
# MAGIC store,
# MAGIC item,
# MAGIC sales,
# MAGIC sales_predicted,
# MAGIC sales_predicted_upper,
# MAGIC sales_predicted_lower,
# MAGIC training_date)
# MAGIC values (n.ds,
# MAGIC n.store,
# MAGIC n.item,
# MAGIC n.y,
# MAGIC n.yhat,
# MAGIC n.yhat_upper,
# MAGIC n.yhat_lower,
# MAGIC n.training_date)
# COMMAND ----------
# MAGIC %md But how good (or bad) is each forecast? Using the pandas function technique, we can generate evaluation metrics for each store-item forecast as follows:
# COMMAND ----------
# DBTITLE 1,Apply Same Techniques to Evaluate Each Forecast
# schema of expected result set
eval_schema =StructType([
StructField('training_date', DateType()),
StructField('store', IntegerType()),
StructField('item', IntegerType()),
StructField('mae', FloatType()),
StructField('mse', FloatType()),
StructField('rmse', FloatType())
])
# define function to calculate metrics
def evaluate_forecast( evaluation_pd: pd.DataFrame ) -> pd.DataFrame:
# get store & item in incoming data set
training_date = evaluation_pd['training_date'].iloc[0]
store = evaluation_pd['store'].iloc[0]
item = evaluation_pd['item'].iloc[0]
# calculate evaluation metrics
mae = mean_absolute_error( evaluation_pd['y'], evaluation_pd['yhat'] )
mse = mean_squared_error( evaluation_pd['y'], evaluation_pd['yhat'] )
rmse = sqrt( mse )
# assemble result set
results = {'training_date':[training_date], 'store':[store], 'item':[item], 'mae':[mae], 'mse':[mse], 'rmse':[rmse]}
return pd.DataFrame.from_dict( results )
# calculate metrics
results = (
spark
.table('new_forecasts')
.filter('ds < \'2018-01-01\'') # limit evaluation to periods where we have historical data
.select('training_date', 'store', 'item', 'y', 'yhat')
.groupBy('training_date', 'store', 'item')
.applyInPandas(evaluate_forecast, schema=eval_schema)
)
results.createOrReplaceTempView('new_forecast_evals')
# COMMAND ----------
# MAGIC %md Once again, we will likely want to report the metrics for each forecast, so we persist these to a queryable table:
# COMMAND ----------
# DBTITLE 1,Persist Evaluation Metrics
# MAGIC %sql
# MAGIC
# MAGIC create table if not exists forecast_evals (
# MAGIC store integer,
# MAGIC item integer,
# MAGIC mae float,
# MAGIC mse float,
# MAGIC rmse float,
# MAGIC training_date date
# MAGIC )
# MAGIC using delta
# MAGIC partitioned by (training_date);
# MAGIC
# MAGIC insert into forecast_evals
# MAGIC select
# MAGIC store,
# MAGIC item,
# MAGIC mae,
# MAGIC mse,
# MAGIC rmse,
# MAGIC training_date
# MAGIC from new_forecast_evals;
# COMMAND ----------
# MAGIC %md We now have constructed a forecast for each store-item combination and generated basic evaluation metrics for each. To see this forecast data, we can issue a simple query (limited here to product 1 across stores 1 through 3):
# COMMAND ----------
# DBTITLE 1,Visualize Forecasts
# MAGIC %sql
# MAGIC
# MAGIC SELECT
# MAGIC store,
# MAGIC date,
# MAGIC sales_predicted,
# MAGIC sales_predicted_upper,
# MAGIC sales_predicted_lower
# MAGIC FROM forecasts a
# MAGIC WHERE item = 1 AND
# MAGIC store IN (1, 2, 3) AND
# MAGIC date >= '2018-01-01' AND
# MAGIC training_date=current_date()
# MAGIC ORDER BY store
# COMMAND ----------
# MAGIC %md And for each of these, we can retrieve a measure of help us assess the reliability of each forecast:
# COMMAND ----------
# DBTITLE 1,Retrieve Evaluation Metrics
# MAGIC %sql
# MAGIC
# MAGIC SELECT
# MAGIC store,
# MAGIC mae,
# MAGIC mse,
# MAGIC rmse
# MAGIC FROM forecast_evals a
# MAGIC WHERE item = 1 AND
# MAGIC training_date=current_date()
# MAGIC ORDER BY store