Using NXCALS via pyspark on lxplus
Prepared by Ilias
You need to have granted the access to NXCALS service and be in the egroup it-hadoop-nxcals-pro-analytics. Please follow the instruction here.
As the software/bundles are installed in a public area, they can be used from any lxplus account. Ilias provided some shell scripts to facilitate the use:
Go in the your working folder (some link/subfolders will be created).
Use the script:
conda activate nxcals-lcg95-env
The following command, instead, needs to be used only once to set in your folder the soft links to the nxcals bundle files:
It is done.
Accessing NXCALS data
In a configured directory with the commands above, one can use the pre-defined commands of the NXCALS bundle to:
run an interactive session with pySparkthat can be further configured with additional options available from the help menu
./spark-home/bin/pysparkA simple command to test could be:
from cern.nxcals.api.extraction.data.builders import * data = DataQuery.builder(spark).byVariables() \ .system('CMW') \ .startTime('2018-07-20 13:38:00.000').endTime('2018-07-20 13:39:00.000') \ .variable('LHC.BOFSU:TUNE_B1_V') \ .build() data.show(5)
run interactively a python script as standalone application (without using
yarn):that an be also put to a crontab job or run in HTC-condor.
An example of
test.py is the following:
# # -- simple test file # from pyspark import SparkConf from pyspark import SparkContext from pyspark.sql import SparkSession from cern.nxcals.api.extraction.data.builders import * conf = SparkConf() # Possible master values (the location to run the application): # local: Run Spark locally with one worker thread (that is, no parallelism). # local[K]: Run Spark locally with K worker threads. (Ideally, set this to the number of cores on your host.) # local[*]: Run Spark locally with as many worker threads as logical cores on your host. # yarn: Run using a YARN cluster manager. # conf.setMaster('yarn') conf.setMaster('local[*]') conf.setAppName('spark-basic') sc = SparkContext(conf=conf) spark = SparkSession(sc) intensity = DevicePropertyDataQuery.builder(spark).system("CMW") \ .startTime("2018-06-17 00:00:00.000").endTime("2018-06-20 00:00:00.000") \ .entity().parameter("PR.BCT/HotspotIntensity").build() # count the data points print('>>> data : ', intensity.count()) intensity.show()
- run interactively a python script as standalone application (using
yarn) by using
./spark-home/bin/spark-submit --master yarn --executor-memory 4G --total-executor-cores 8 test_yarn.py
An example of
test_yarn.py is the following.
from pyspark import SparkConf from pyspark import SparkContext from pyspark.sql import SparkSession from cern.nxcals.api.extraction.data.builders import * from pyspark.sql.functions import col from pyspark.sql.types import StructType from pyspark.sql.types import StructField from pyspark.sql.types import DoubleType from pyspark.sql.types import ArrayType from pyspark.sql.window import Window from pyspark.sql.functions import rank, col import pyspark.sql.functions as func import time from matplotlib import pyplot as plt import numpy as np import pandas as pd import os # --- Configure spark conf = SparkConf() # Possible master values (the location to run the application): # local: Run Spark locally with one worker thread (that is, no parallelism). # local[K]: Run Spark locally with K worker threads. (Ideally, set this to the number of cores on your host.) # local[*]: Run Spark locally with as many worker threads as logical cores on your host. # yarn: Run using a YARN cluster manager. conf.setMaster('yarn') # conf.setMaster('local[*]') conf.setAppName('spark-basic') sc = SparkContext(conf=conf) spark = SparkSession(sc) # --- initial settings tstart = pd.Timestamp('2018-10-23 11:22:40.094000101+0000', tz='UTC') tend = pd.Timestamp('2018-10-23 13:36:20.035000086+0000', tz='UTC') variable = 'LHC.BCTFR.A6R4.B1:BUNCH_FILL_PATTERN' variable_spark = 'LHC@BCTFR@A6R4@B1:BUNCH_FILL_PATTERN' t1 = tstart.tz_convert('UTC').tz_localize(None) t2 = tend.tz_convert('UTC').tz_localize(None) # --- get the data ds = DataQuery.builder(spark).byVariables().system("CMW").startTime(t1).endTime(t2).variable(variable).buildDataset() auxdf = ds.select('nxcals_timestamp','nxcals_value.elements').withColumnRenamed('nxcals_timestamp','timestamp') \ .withColumnRenamed('elements',variable_spark) print('\n\nThis is the result:\n') print(auxdf.count())
Install your own NXCALS bundle
If you want to install your own bundle you can follow the following steps.
- download the Miniconda3 Linux 64bit
- execute it with
bash Miniconda-latest-Linux-x86_64.shand follow the installer prompts
- use the
~lumimod/public/miniconda3as the installation directory
- removed the conda initialization lines from .bashrc and put them in a separate file (see below)
Install nxcals bundle
Follow the instructions from NXCALS Documentation. Navigate to:
Public APIs > Data Access Methods > NXCALS Spark bundle
Created the installation script
#!/bin/bash curl -s -k -O http://photons-resources.cern.ch/downloads/nxcals_pro/spark/spark-nxcals.zip unzip spark-nxcals.zip rm spark-nxcals.zip cd spark-*-bin-hadoop2.7
and install the software in
~/lumimod/public/nxcals (to replace with your desired path) directory.
To remain compatible with NXCALS installation in SWAN, you can configure the python environment as LCG95. Create a minimal set for our needs in
pandas==0.23.3 numpy==1.14.2 matplotlib==2.2.2 pyarrow==0.8.0
conda create -n nxcals-lcg95-env --file lcg95_basic.txt
conda env list ! list the available environments conda activate nxcals-lcg95-env ! activate an environment conda deactivate ! when done to exit the environment