Skip to content

Latest commit

 

History

History
347 lines (277 loc) · 12.1 KB

README.md

File metadata and controls

347 lines (277 loc) · 12.1 KB

SARplus

Pronounced surplus as it's simply better if not best!

sarplus test and package PyPI version Python version Maven Central version Maven Central version (Spark 3.2+)

Simple Algorithm for Recommendation (SAR) is a neighborhood based algorithm for personalized recommendations based on user transaction history. SAR recommends items that are most similar to the ones that the user already has an existing affinity for. Two items are similar if the users that interacted with one item are also likely to have interacted with the other. A user has an affinity to an item if they have interacted with it in the past.

SARplus is an efficient implementation of this algorithm for Spark.

Features:

  • Scalable PySpark based implementation
  • Fast C++ based predictions
  • Reduced memory consumption: similarity matrix cached in-memory once per worker, shared across python executors

Benchmarks

# Users # Items # Ratings Runtime Environment Dataset
2.5mio 35k 100mio 1.3h Databricks, 8 workers, Azure Standard DS3 v2 (4 core machines)

Top-K Recommendation Optimization

There are a couple of key optimizations:

  • map item ids (e.g. strings) to a continuous set of indexes to optimize storage and simplify access
  • convert similarity matrix to exactly the representation the C++ component needs, thus enabling simple shared, memory mapping of the cache file and avoid parsing. This requires a customer formatter, written in Scala
  • shared read-only memory mapping allows us to re-use the same memory from multiple python executors on the same worker node
  • partition the input test users and past seen items by users, allowing for scale out
  • perform as much of the work as possible in PySpark (way simpler)
  • top-k computation
    • reverse the join by summing reverse joining the users past seen items with any related items
    • make sure to always just keep top-k items in-memory
    • use standard join using binary search between users past seen items and the related items

Image of sarplus top-k recommendation optimization

Usage

Two packages should be installed:

Python

from pysarplus import SARPlus

# spark dataframe with user/item/rating/optional timestamp tuples
train_df = spark.createDataFrame(
    [(1, 1, 1), (1, 2, 1), (2, 1, 1), (3, 1, 1), (3, 3, 1)],
    ["user_id", "item_id", "rating"]
)

# spark dataframe with user/item tuples
test_df = spark.createDataFrame(
    [(1, 1, 1), (3, 3, 1)],
    ["user_id", "item_id", "rating"],
)

# To use C++ based fast prediction, a local cache directory needs to be
# specified.
# * On local machine, `cache_path` can be any valid directories. For example,
#
#   ```python
#   model = SARPlus(
#       spark,
#       col_user="user_id",
#       col_item="item_id",
#       col_rating="rating",
#       col_timestamp="timestamp",
#       similarity_type="jaccard",
#       cache_path="cache",
#   )
#   ```
#
# * On Databricks, `cache_path` needs to be mounted on DBFS.  For example,
#
#   ```python
#   model = SARPlus(
#       spark,
#       col_user="user_id",
#       col_item="item_id",
#       col_rating="rating",
#       col_timestamp="timestamp",
#       similarity_type="jaccard",
#       cache_path="dbfs:/mnt/sarpluscache/cache",
#   )
#   ```
#
# * On Azure Synapse, `cache_path` needs to be mounted on Spark pool's driver
#   node.  For example,
#
#   ```python
#   model = SARPlus(
#       spark,
#       col_user="user_id",
#       col_item="item_id",
#       col_rating="rating",
#       col_timestamp="timestamp",
#       similarity_type="jaccard",
#       cache_path=f"synfs:/{job_id}/mnt/sarpluscache/cache",
#   )
#   ```
#
#   where `job_id` can be obtained by
#
#   ```python
#   from notebookutils import mssparkutils
#   job_id = mssparkutils.env.getJobId()
#   ```
model = SARPlus(
    spark,
    col_user="user_id",
    col_item="item_id",
    col_rating="rating",
    col_timestamp="timestamp",
    similarity_type="jaccard",
)
model.fit(train_df)

# To use C++ based fast prediction, the `use_cache` parameter of
# `SARPlus.recommend_k_items()` also needs to be set to `True`.
#
# ```
# model.recommend_k_items(test_df, top_k=3, use_cache=True).show()
# ```
model.recommend_k_items(test_df, top_k=3, remove_seen=False).show()

Jupyter Notebook

Insert this cell prior to the code above.

import os

SARPLUS_MVN_COORDINATE = "com.microsoft.sarplus:sarplus_2.12:0.6.6"
SUBMIT_ARGS = f"--packages {SARPLUS_MVN_COORDINATE} pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS

from pyspark.sql import SparkSession

spark = (
    SparkSession.builder.appName("sample")
    .master("local[*]")
    .config("memory", "1G")
    .config("spark.sql.shuffle.partitions", "1")
    .config("spark.sql.crossJoin.enabled", True)
    .config("spark.sql.sources.default", "parquet")
    .config("spark.sql.legacy.createHiveTableByDefault", True)
    .config("spark.ui.enabled", False)
    .getOrCreate()
)

PySpark Shell

SARPLUS_MVN_COORDINATE="com.microsoft.sarplus:sarplus_2.12:0.6.6"

# Install pysarplus
pip install pysarplus

# Specify sarplus maven coordinate and configure Spark environment
pyspark --packages "${SARPLUS_MVN_COORDINATE}" \
        --conf spark.sql.crossJoin.enabled=true \
        --conf spark.sql.sources.default=parquet \
        --conf spark.sql.legacy.createHiveTableByDefault=true

Databricks

Install libraries

  1. Navigate to your Databricks Workspace
  2. Create Library
  3. Under Library Source select Maven
  4. Enter into Coordinates:
    • com.microsoft.sarplus:sarplus_2.12:0.6.6
    • or com.microsoft.sarplus:sarplus-spark-3-2-plus_2.12:0.6.6 (if you're on Spark 3.2+)
  5. Hit Create
  6. Attach to your cluster
  7. Create 2nd library
  8. Under Library Source select PyPI
  9. Enter pysarplus==0.6.6
  10. Hit Create

This will install C++, Python and Scala code on your cluster. See Libraries for details on how to install libraries on Azure Databricks.

Configurations

  1. Navigate to your Databricks Compute

  2. Navigate to your cluster's Configuration -> Advanced options -> Spark

  3. Put the following configurations into Spark config

    spark.sql.crossJoin.enabled true
    spark.sql.sources.default parquet
    spark.sql.legacy.createHiveTableByDefault true
    

These will set the crossJoin property to enable calculation of the similarity matrix, and set default sources to parquet.

It can also be configured by putting the following Python code in a notebook cell:

spark.conf.set("spark.sql.crossJoin.enabled", "true")
spark.conf.set("spark.sql.sources.default", "parquet")
spark.conf.set("spark.sql.legacy.createHiveTableByDefault", "true")

Prepare local file system for cache

To use C++ based fast prediction in pysarplus.SARPlus.recommend_k_items(), a local cache directory needs to be specified as the cache_path parameter of pysarplus.SARPlus() to store intermediate files during its calculation, so you'll also have to mount shared storage.

For example, you can create a storage account (e.g. sarplusstorage) and a container (e.g. sarpluscache) in the storage account, copy the access key of the storage account, and then run the following code to mount the storage.

dbutils.fs.mount(
  source = "wasbs://<container>@<storage-account>.blob.core.windows.net",
  mount_point = "/mnt/<container>",
  extra_configs = {
    "fs.azure.account.key.<storage-account>.blob.core.windows.net":
    "<access-key>"
  }
)

where <storage-account>, <container> and <access-key> should be replaced with the actual values, such as sarplusstorage, sarpluscache and the access key of the storage account. Then pass cache_path="dbfs:/mnt/<container>/cache" to pysarplus.SARPlus(), where cache is the cache's name.

To disable logging messages:

import logging
logging.getLogger("py4j").setLevel(logging.ERROR)

Azure Synapse

Install libraries

  1. Download pysarplus WHL file from pysarplus@PyPI

  2. Download sarplus JAR file from sarplus@MavenCentralRepository

    (or sarplus-spark-3-2-plus@MavenCentralRepository if run on Spark 3.2+)

  3. Navigate to your Azure Synapse workspace -> Manage -> Workspace packages

  4. Upload pysarplus WHL file and sarplus JAR file as workspace packages

  5. Navigate to your Azure Synapse workspace -> Manage -> Apache Spark pools

  6. Find the Spark pool to install the packages -> ... -> Packages -> Workspace packages -> + Select from workspace packages and select pysarplus TAR file and sarplus JAR file uploaded in the previous step

  7. Apply

pysarplus can also be installed via requirements.txt. See Manage libraries for Apache Spark in Azure Synapse Analytics for details on how to manage libraries in Azure Synapse.

Prepare local file system for cache

To use C++ based fast prediction in pysarplus.SARPlus.recommend_k_items(), a local cache directory needs to be specified as the cache_path parameter of pysarplus.SARPlus() to store intermediate files during its calculation, so you'll also have to mount shared storage.

For example, you can run the following code to mount the file system (container) of the default/primary storage account.

from notebookutils import mssparkutils
mssparkutils.fs.mount(
    "abfss://<container>@<storage-account>.dfs.core.windows.net",
    "/mnt/<container>",
    { "linkedService": "<storage-linked-service>"}
)
job_id = mssparkutils.env.getJobId()

Then pass cache_path=f"synfs:/{job_id}/mnt/<container>/cache" to pysarplus.SARPlus(), where cache is the cache's name. NOTE: job_id should be prepended to the local path.

See How to use file mount/unmount API in Synapse for more details.

Development

See DEVELOPMENT.md for implementation details and development information.