Cosa sono i comandi Spark Shell?

Spark shell è un'interfaccia utilizzata per scrivere query ad hoc per funzionare e comprendere il comportamento di Apache Spark. Si chiama il motore open source del cluster computing che può eseguire l'elaborazione in memoria di dati come per analisi, ETL, machine learning per enormi set di dati. In questo argomento, impareremo i comandi di Spark Shell.

Esistono vari tipi di Spark shell per diversi linguaggi di programmazione:

  1. spark-shell è scritto in Scala
  2. pyspark è in Python e
  3. sparkR per linguaggio R.

Si può sviluppare la sua applicazione standalone con l'aiuto di Spark. È ampiamente utilizzato a causa della sua velocità computazionale superveloce. Questo perché utilizza MapReduce per elaborare varie query e trasformazioni.

Per eseguire i comandi spark-shell, richiede che Java e Scala siano già installati nel sistema.

Tipi di comandi Spark Shell

I vari tipi di comandi Spark-shell sono i seguenti:

1. Per verificare se Spark è installato e per conoscerne la versione, viene utilizzato il comando seguente (Tutti i comandi di seguito devono essere indicati a partire da questo simbolo "$")

$ spark-shell

Il seguente output viene visualizzato se è installata la scintilla:

$ spark-shell

SPARK_MAJOR_VERSION è impostato su 2, utilizzando Spark2

Impostazione del livello di registro predefinito su "WARN".

Per regolare il livello di registrazione, utilizzare sc.setLogLevel (newLevel). Per SparkR, utilizzare setLogLevel (newLevel).

Interfaccia utente Web del contesto Spark disponibile all'indirizzo http://10.113.59.34:4040

Contesto Spark disponibile come 'sc' (master = local (*), id app = local-1568732886588).

Sessione Spark disponibile come 'spark'.

Benvenuto a

____ __

/ __ / __ ___ _____ / / __

_ \ \ / _ \ / _ `/ __ / '_ /

/ ___ / .__ / \ _, _ / _ / / _ / \ _ \ versione 2.2.0.2.6.3.0-235

/ _ /

Utilizzo di Scala versione 2.11.8 (Java HotSpot (TM) VM a 64 bit Server, Java 1.8.0_112)

Digita le espressioni per farle valutare.

Digitare: aiuto per ulteriori informazioni.

scala>

2. La struttura dei dati di base di Spark è chiamata RDD (Resilient Distributed Dataset) che contiene una raccolta immutabile di oggetti per il calcolo distribuito dei record. Tutti i set di dati di RDD sono partizionati logicamente su più nodi di un cluster.

Un RDD può essere creato solo leggendo da un file system locale o trasformando un RDD esistente.

a) Per creare un nuovo RDD utilizziamo il seguente comando:

scala> val examplefile = sc.textFile("file.txt")

Qui sc viene chiamato l'oggetto di SparkContext.

Produzione:

examplefile: org.apache.spark.rdd.RDD(String) = file.txt MapPartitionsRDD(3) at textFile at :24

b) Un RDD può essere creato tramite Raccolta parallela come segue:

scala> val oddnum = Array(1, 3, 5, 7, 9)

Produzione:

oddnum: Array(Int) = Array(1, 3, 5, 7, 9)
scala> val value = sc.parallelize(oddnum)

Produzione:

value: org.apache.spark.rdd.RDD(Int) = ParallelCollectionRDD(4) at parallelize at :26

c) Per creare da RDD esistenti :

scala> val newRDD = oddnum.map(value => (value * 2))

Produzione:

newRDD: Array(Int) = Array(2, 6, 10, 14, 18)

3. Esistono due tipi di operazioni Spark RDD che possono essere eseguite sui set di dati creati:

  • Azioni
  • trasformazioni

Azioni: viene utilizzato per eseguire determinate operazioni richieste sui set di dati esistenti. Di seguito sono riportati alcuni dei comandi che è possibile utilizzare per eseguire le seguenti azioni sui set di dati creati:

a) funzione count () per contare il numero di elementi in RDD:

scala> value.count()

Produzione:

res3: Long = 5

b) funzione collect () per visualizzare tutti gli elementi dell'array:

scala> value.collect()

Produzione:

res5: Array(Int) = Array(1, 3, 5, 7, 9)

c) funzione first () utilizzata per visualizzare il primo elemento del set di dati:

scala> value.first()

Produzione:

res4: Int = 1

d) la funzione take (n) visualizza i primi n elementi dell'array:

scala> value.take(3)

Produzione:

res6: Array(Int) = Array(1, 3, 5)

e) La funzione takeSample (withReplacement, num, (seed)) visualizza un array casuale di elementi "num" in cui il seed è per il generatore di numeri casuali.

scala> value.takeSample(false, 3, System.nanoTime.toInt)

Produzione:

res8: Array(Int) = Array(3, 1, 7)

f) La funzione saveAsTextFile (path) salva il set di dati nel percorso specificato della posizione di hdfs

scala> value.saveAsTextFile("/user/valuedir")

g) partizioni. La funzione lunghezza può essere utilizzata per trovare il numero di partizioni nel RDD

scala> value.partitions.length

Produzione:

res1: Int = 8

Trasformazioni RDD

La trasformazione viene utilizzata per formare un nuovo RDD da quelli esistenti. Poiché gli input dell'RDD sono immutabili, il risultato formato durante la trasformazione può essere uno o più RDD come output.

Esistono due tipi di trasformazioni:

  • Trasformazioni strette
  • Ampie trasformazioni

Trasformazioni strette - Ogni RDD padre è diviso in varie partizioni e tra queste solo una partizione verrà utilizzata dal figlio RDD.

Esempio: map () e filter () sono i due tipi base di trasformazioni di base che vengono chiamate quando viene chiamata un'azione.

  • La funzione map (func) opera su ciascuno degli elementi nel "valore" del set di dati in modo iterativo per produrre l'uscita RDD.

Esempio: in questo esempio, stiamo aggiungendo il valore 10 a ciascuno degli elementi del valore del set di dati e visualizzando l'output trasformato con l'aiuto della funzione di raccolta.

scala> val mapfunc = value.map(x => x+10)
mapfunc: org.apache.spark.rdd.RDD(Int) = MapPartitionsRDD(3) at map at :28

scala> mapfunc.collect
res2: Array(Int) = Array(11, 13, 15, 17, 19)

La funzione filter (func) è sostanzialmente usata per filtrare gli elementi che soddisfano una particolare condizione specificata usando la funzione.

Esempio: in questo esempio, stiamo cercando di recuperare tutti gli elementi tranne il numero 2 del set di dati “valore” e di recuperare l'output tramite la funzione di raccolta.

scala> val fill = value.filter(x => x!=2)
fill: org.apache.spark.rdd.RDD(Int) = MapPartitionsRDD(7) at filter at :28

scala> fill.collect
res8: Array(Int) = Array(4, 6, 8, 10)

Trasformazioni estese: una partizione RDD a genitore singolo è condivisa su varie partizioni RDD figlio multiple.

Esempio: groupbykey e reducebyKey sono esempi di ampie trasformazioni.

  • La funzione groupbyKey raggruppa i valori del set di dati in coppie chiave-valore in base ai valori chiave di un altro RDD. Questo processo comporta lo shuffle che ha luogo quando il raggruppamento per funzione raccoglie i dati associati a una particolare chiave e li memorizza in una singola coppia chiave-valore.

Esempio: In questo esempio, stiamo assegnando i numeri interi 5, 6 al valore stringa “chiave” e il numero intero 8 assegnato a “8” che vengono visualizzati nello stesso formato di coppia chiave-valore nell'output.

scala> val data = spark.sparkContext.parallelize(Array(("key", 5), ("val", 8), ("key", 6)), 3)
data: org.apache.spark.rdd.RDD((String, Int)) = ParallelCollectionRDD(13) at parallelize at :23

scala> val group = data.groupByKey().collect()
group: Array((String, Iterable(Int))) = Array((key, CompactBuffer(5, 6)), (val, CompactBuffer(8)))

scala> group.foreach(println)
(key, CompactBuffer(5, 6))
(val, CompactBuffer(8))

  • La funzione reduceByKey combina anche le coppie chiave-valore di diversi RDD. Combina le chiavi e i rispettivi valori in un singolo elemento dopo aver eseguito la trasformazione menzionata.

Esempio: in questo esempio, i tasti comuni dell'array “lettere” vengono prima parallelizzati dalla funzione e ogni lettera viene mappata con il conteggio 10 su di essa. ReduceByKey aggiungerà i valori con chiavi simili e salva nella variabile value2. L'output viene quindi visualizzato utilizzando la funzione di raccolta.

scala> val letters = Array("A", "B", "C", "D", "B", "C", "E", "D")
letters: Array(String) = Array(A, B, C, D, B, C, E, D)

scala> val value2 = spark.sparkContext.parallelize(letters).map(w => (w, 10)).reduceByKey(_+_)
value2: org.apache.spark.rdd.RDD((String, Int)) = ShuffledRDD(20) at reduceByKey at :25

scala> value2.foreach(println)
(C, 20)
(E, 10)
(D, 20)
(B, 20)
(A, 10)

Insieme alle azioni sopra menzionate come il partizionamento su RDD e l'esecuzione di azioni / trasformazioni su di esse, Spark supporta anche la memorizzazione nella cache, utile quando vengono chiamati ricorsivamente gli stessi dati.

Con l'aiuto di tutte queste proprietà, Apache Spark può elaborare enormi volumi di dati ed eseguire l'elaborazione in batch e l'elaborazione in streaming. Il calcolo in memoria eseguito da Spark è responsabile dell'elaborazione estremamente rapida delle applicazioni. Quindi Spark è il metodo di riferimento grazie alla sua versatilità di programmazione su lingue diverse, facilità d'uso e capacità di integrazione.

Articoli consigliati

Questa è una guida ai comandi Spark Shell. Qui discutiamo i vari tipi di comandi Spark Shell per diversi linguaggi di programmazione. Puoi anche leggere il seguente articolo per saperne di più -

  1. Comandi di scripting della shell
  2. Come installare Spark
  3. Domande di intervista a Spark
  4. Comandi Spark
  5. Test ad hoc
  6. Generatore di numeri casuali in JavaScript
  7. Guida all'elenco dei comandi Unix Shell
  8. PySpark SQL | Moduli e metodi di PySpark SQL
  9. Per Loop in Shell Scripting | Come funziona il loop?
  10. Comandi di script batch con esempi
  11. Panoramica completa dei componenti Spark

Categoria: