logo

PySpark SQL

Apache Spark היא התוכנה המצליחה ביותר של Apache Software Foundation ומיועדת למחשוב מהיר. מספר תעשיות משתמשות ב- Apache Spark כדי למצוא את הפתרונות שלהן. PySpark SQL הוא מודול ב-Spark המשלב עיבוד יחסי עם API לתכנות פונקציונלי של Spark. אנו יכולים לחלץ את הנתונים באמצעות שפת שאילתות SQL. אנחנו יכולים להשתמש בשאילתות זהה לשפת SQL.

אם יש לך הבנה בסיסית של RDBMS, PySpark SQL יהיה קל לשימוש, שבו תוכל להרחיב את המגבלה של עיבוד נתונים יחסיים מסורתיים. Spark תומך גם בשפת השאילתה של Hive, אך ישנן מגבלות של מסד הנתונים של Hive. Spark SQL פותח כדי להסיר את החסרונות של מסד הנתונים של Hive. בואו נסתכל על החסרונות הבאים של Hive:

חסרונות של Hive

  • זה לא יכול לחדש את העיבוד, כלומר אם הביצוע נכשל באמצע זרימת עבודה, אתה לא יכול להמשיך מהמקום שבו הוא נתקע.
  • אנחנו לא יכולים להפיל את מסדי הנתונים המוצפנים במפל כאשר האשפה מופעלת. זה מוביל לשגיאת הביצוע. כדי להסיר סוג כזה של מסד נתונים, המשתמשים צריכים להשתמש באפשרות טיהור.
  • השאילתות אד-הוק מבוצעות באמצעות MapReduce, אשר מושק על ידי ה-Hive, אך כאשר אנו מנתחים את מסד הנתונים בגודל בינוני, זה מעכב את הביצועים.
  • Hive אינו תומך בפעולת העדכון או המחיקה.
  • זה מוגבל לתמיכה בשאילתות המשנה.

חסרונות אלו הם הסיבות לפתח את ה-Apache SQL.

הקדמה קצרה של PySpark SQL

PySpark תומך בעיבוד יחסי משולב עם התכנות הפונקציונלי של Spark. הוא מספק תמיכה במקורות הנתונים השונים כדי לאפשר לטוות שאילתות SQL עם טרנספורמציות קוד, ובכך נוצר כלי חזק מאוד.

PySpark SQL יוצר את החיבור בין ה-RDD לטבלת היחסים. הוא מספק אינטגרציה הרבה יותר קרובה בין עיבוד יחסי ופרוצדורלי באמצעות Dataframe API הצהרתי, המשולב עם קוד Spark.

דנאשרי ורמה

באמצעות SQL, זה יכול להיות נגיש בקלות ליותר משתמשים ולשפר את האופטימיזציה למשתמשים הנוכחיים. זה גם תומך במגוון הרחב של מקורות נתונים ואלגוריתמים ב-Big-data.

תכונה של PySpark SQL

התכונות של PySpark SQL ניתנות להלן:

1) גישה לנתוני עקביות

זה מספק גישה עקבית לנתונים, כלומר SQL תומך בדרך משותפת לגשת למגוון מקורות נתונים כמו Hive, Avro, Parquet, JSON ו-JDBC. זה ממלא תפקיד משמעותי בהכלת כל המשתמשים הקיימים לתוך Spark SQL.

2) שילוב עם ספארק

שאילתות PySpark SQL משולבות עם תוכניות Spark. אנחנו יכולים להשתמש בשאילתות בתוך תוכניות Spark.

אחד היתרונות הגדולים ביותר שלו הוא שמפתחים לא צריכים לנהל ידנית כשל במצב או לשמור את היישום מסונכרן עם עבודות אצווה.

3) קישוריות סטנדרטית

הוא מספק חיבור דרך JDBC או ODBC, ושני אלה הם הסטנדרטים בתעשייה לקישוריות לכלי בינה עסקית.

4) פונקציות בהגדרת משתמש

ל- PySpark SQL יש שפה משולבת פונקציה מוגדרת על ידי משתמש (UDF). UDF משמש להגדרת פונקציה חדשה מבוססת עמודות המרחיבה את אוצר המילים של ה-DSL של Spark SQL להמרת DataFrame.

חברה מול חברה

5) תאימות כוורת

PySpark SQL מריץ שאילתות Hive ללא שינוי על נתונים נוכחיים. זה מאפשר תאימות מלאה לנתוני Hive הנוכחיים.

PySpark SQL מודול

כמה מחלקות חשובות של Spark SQL ו-DataFrames הן הבאות:

    pyspark.sql.SparkSession:זה מייצג את נקודת הכניסה העיקרית עבור DataFrame ופונקציונליות SQL.pyspark.sql.DataFrame:הוא מייצג אוסף מבוזר של נתונים המקובצים בעמודות עם שם.pyspark.sql.Column:הוא מייצג ביטוי עמודה ב-a DataFrame. pyspark.sql.Row:הוא מייצג שורת נתונים ב-a DataFrame. pyspark.sql.GroupedData:שיטות צבירה, הוחזרו על ידי DataFrame.groupBy(). pyspark.sql.DataFrameNaFunctions:הוא מייצג שיטות לטיפול בנתונים חסרים (ערכים אפס).pyspark.sql.DataFrameStatFunctions:הוא מייצג שיטות לפונקציונליות סטטיסטיקה.pysark.sql.functions:זה מייצג רשימה של פונקציות מובנות הזמינות עבור DataFrame. pyspark.sql.types:הוא מייצג רשימה של סוגי נתונים זמינים.pyspark.sql.Window:הוא משמש לעבודה עם פונקציות של חלון.

שקול את הדוגמה הבאה של PySpark SQL.

 import findspark findspark.init() import pyspark # only run after findspark.init() from pyspark.sql import SparkSession spark = SparkSession.builder.getOrCreate() df = spark.sql('''select 'spark' as hello ''') df.show() 

תְפוּקָה:

 +-----+ |hello| +-----+ |spark| +-----+ 

הסבר קוד:

בקוד לעיל, ייבאנו את findspark מודול ונקרא findspark.init() בַּנַאִי; לאחר מכן, ייבאנו את מודול SparkSession כדי ליצור הפעלת spark.

בנאים ב-java

מ-pyspark.sql ייבוא ​​SparkSession

ניתן להשתמש בסשן ניצוץ ליצירת ה-Dataset ו-DataFrame API. ניתן להשתמש ב-SparkSession גם ליצירת DataFrame, רישום DataFrame כטבלה, ביצוע SQL מעל טבלאות, טבלת מטמון וקריאת קובץ פרקט.

בונה כיתה

זה בונה של Spark Session.

getOrCreate()

הוא משמש כדי לקבל קיים SparkSession, או אם אין אחד קיים, צור אחד חדש על סמך האפשרויות שנקבעו ב-Builder.

כמה שיטות אחרות

כמה שיטות של PySpark SQL הן הבאות:

1. שם אפליקציה (שם)

הוא משמש להגדרת שם האפליקציה, אשר יוצג בממשק המשתמש של האינטרנט של Spark. הפרמטר שֵׁם מקבל את שם הפרמטר.

2. config(key=None, value = None, conf = None)

הוא משמש להגדרת אפשרות תצורה. אפשרויות שנקבעו בשיטה זו מופצות אוטומטית לשניהם SparkConf ו SparkSession התצורה של.

 from pyspark.conf import SparkConfSparkSession.builder.config(conf=SparkConf()) 

פרמטרים:

מה המידות של מסך המחשב שלי
    מַפְתֵחַ-מחרוזת שם מפתח של מאפיין תצורה.ערך-הוא מייצג את הערך של מאפיין תצורה.conf -מופע של SparkConf.

3. מאסטר (מאסטר)

הוא מגדיר את כתובת האתר הראשית של spark שאליו מתחבר, כגון 'מקומי' לרוץ מקומי, 'מקומי[4]' לרוץ מקומי עם 4 ליבות.

פרמטרים:

    לִשְׁלוֹט:כתובת אתר עבור spark master.

4. SparkSession.catalog

זהו ממשק שהמשתמש יכול ליצור, לשחרר, לשנות או לבצע שאילתות במסד הנתונים הבסיסיים, הטבלאות, הפונקציות וכו'.

5. SparkSession.conf

מחרוזת למספרים שלמים

זהו ממשק תצורת זמן ריצה עבור spark. זהו הממשק שדרכו המשתמש יכול לקבל ולהגדיר את כל תצורות Spark ו-Hadoop הרלוונטיות ל-Spark SQL.

class pyspark.sql.DataFrame

זהו אוסף מבוזר של נתונים המקובצים בעמודות עם שם. DataFrame דומה לטבלה ההתייחסותית ב-Spark SQL, ניתן ליצור באמצעות פונקציות שונות ב- SQLContext.

 student = sqlContext.read.csv('...') 

לאחר יצירת מסגרת הנתונים, אנו יכולים לתפעל אותו באמצעות מספר שפות ספציפיות לתחום (DSL) שהן פונקציות מוגדרות מראש של DataFrame. שקול את הדוגמה הבאה.

 # To create DataFrame using SQLContext student = sqlContext.read.parquet('...') department = sqlContext.read.parquet('...') student.filter(marks > 55).join(department, student.student_Id == department.id)  .groupBy(student.name, 'gender').({'name': 'student_Id', 'mark': 'department'}) 

הבה נבחן את הדוגמה הבאה:

שאילתות באמצעות Spark SQL

בקוד הבא, ראשית, אנו יוצרים DataFrame ומבצעים את שאילתות SQL כדי לאחזר את הנתונים. שקול את הקוד הבא:

 from pyspark.sql import * #Create DataFrame songdf = spark.read.csv(r'C:UsersDEVANSH SHARMA	op50.csv', inferSchema = True, header = True) #Perform SQL queries songdf.select('Genre').show() songdf.filter(songdf['Genre']=='pop').show() 

תְפוּקָה:

 +----------------+ | Genre| +----------------+ | canadian pop| | reggaeton flow| | dance pop| | pop| | dfw rap| | pop| | trap music| | pop| | country rap| | electropop| | reggaeton| | dance pop| | pop| | panamanian pop| |canadian hip hop| | dance pop| | latin| | dfw rap| |canadian hip hop| | escape room| +----------------+ only showing top 20 rows +---+--------------------+-------------+-----+----------------+------+------------+--------------+--------+--------+-------+--------------+------------+----------+ |_c0| Track.Name| Artist.Name|Genre|Beats.Per.Minute|Energy|Danceability|Loudness..dB..|Liveness|Valence.|Length.|Acousticness..|Speechiness.|Popularity| +---+--------------------+-------------+-----+----------------+------+------------+--------------+--------+--------+-------+--------------+------------+----------+ | 4|Beautiful People ...| Ed Sheeran| pop| 93| 65| 64| -8| 8| 55| 198| 12| 19| 86| | 6|I Don't Care (wit...| Ed Sheeran| pop| 102| 68| 80| -5| 9| 84| 220| 9| 4| 84| | 8| How Do You Sleep?| Sam Smith| pop| 111| 68| 48| -5| 8| 35| 202| 15| 9| 90| | 13| Someone You Loved|Lewis Capaldi| pop| 110| 41| 50| -6| 11| 45| 182| 75| 3| 88| | 38|Antisocial (with ...| Ed Sheeran| pop| 152| 82| 72| -5| 36| 91| 162| 13| 5| 87| | 44| Talk| Khalid| pop| 136| 40| 90| -9| 6| 35| 198| 5| 13| 84| | 50|Cross Me (feat. C...| Ed Sheeran| pop| 95| 79| 75| -6| 7| 61| 206| 21| 12| 82| +---+--------------------+-------------+-----+----------------+------+------------+--------------+--------+--------+-------+--------------+------------+----------+ 

שימוש בפונקציה groupBy()

הפונקציה groupBy() אוספת את נתוני הקטגוריות הדומות.

 songdf.groupBy('Genre').count().show() 

תְפוּקָה:

 +----------------+-----+ | Genre|count| +----------------+-----+ | boy band| 1| | electropop| 2| | pop| 7| | brostep| 2| | big room| 1| | pop house| 1| | australian pop| 1| | edm| 3| | r&b en espanol| 1| | dance pop| 8| | reggaeton| 2| | canadian pop| 2| | trap music| 1| | escape room| 1| | reggaeton flow| 2| | panamanian pop| 2| | atl hip hop| 1| | country rap| 2| |canadian hip hop| 3| | dfw rap| 2| +----------------+-----+ 

הפצה (מספר מחיצות, *קולות)

ה הפצה() מחזירה DataFrame חדש שהוא ביטוי מחיצות. פונקציה זו מקבלת שני פרמטרים מחיצות מספר ו *קול. ה מחיצות מספר פרמטר מציין את מספר היעד של העמודות.

 song_spotify.repartition(10).rdd.getNumPartitions() data = song_spotify.union(song_spotify).repartition('Energy') data.show(5) 

תְפוּקָה:

 +---+--------------------+-----------+-------+----------------+------+------------+--------------+--------+--------+-------+--------------+------------+----------+ |_c0| Track.Name|Artist.Name| Genre|Beats.Per.Minute|Energy|Danceability|Loudness..dB..|Liveness|Valence.|Length.|Acousticness..|Speechiness.|Popularity| +---+--------------------+-----------+-------+----------------+------+------------+--------------+--------+--------+-------+--------------+------------+----------+ | 4|Beautiful People ...| Ed Sheeran| pop| 93| 65| 64| -8| 8| 55| 198| 12| 19| 86| | 5|Goodbyes (Feat. Y...|Post Malone|dfw rap| 150| 65| 58| -4| 11| 18| 175| 45| 7| 94| | 17| LA CANCI?N| J Balvin| latin| 176| 65| 75| -6| 11| 43| 243| 15| 32| 90| | 4|Beautiful People ...| Ed Sheeran| pop| 93| 65| 64| -8| 8| 55| 198| 12| 19| 86| | 5|Goodbyes (Feat. Y...|Post Malone|dfw rap| 150| 65| 58| -4| 11| 18| 175| 45| 7| 94| +---+--------------------+-----------+-------+----------------+------+------------+--------------+--------+--------+-------+--------------+------------+----------+ only showing top 5 rows