At the end of the year there are many departments of the company that request different data, both to perform analytics and to generate external reports.
The Data and Business Intelligence director has called his Data Engineering team and has listed the Dataset that they have to generate to respond to the different requests they have had.
To carry out the work, they have different text files ingested from the different databases of the company's management systems. These are:
The requested data reports are:
Hint for 4: For the first Dataset about liquidate VAT you first shoul join the transactionsDF with countryVATDF, create a new column with the corresponding VAT for each transactions and after that to group by country and to agregate the sum on the new created column.
For the second Dataset we should first filter by payment type different than 'Cash' and after that to group by payment type and aggregate the sum on Amount.
import os
import sys
os.environ['SPARK_HOME'] = "/Users/furqan/Downloads/spark-2.3.2-bin-hadoop2.7"
# Create a variable for our root path
SPARK_HOME = os.environ['SPARK_HOME']
#Add the following paths to the system path. Please check your installation
#to make sure that these zip files actually exist. The names might change
#as versions change.
sys.path.insert(0,os.path.join(SPARK_HOME,"python"))
sys.path.insert(0,os.path.join(SPARK_HOME,"python","lib"))
sys.path.insert(0,os.path.join(SPARK_HOME,"python","lib","pyspark.zip"))
sys.path.insert(0,os.path.join(SPARK_HOME,"python","lib","py4j-0.10.7-src.zip"))
#Initialize SparkSession and SparkContext
from pyspark.sql import SparkSession
#from pyspark import SparkContext
#Create a Spark Session
MySparkSession = SparkSession \
.builder \
.master("local[2]") \
.appName("MiPrimer") \
.config("spark.executor.memory", "6g") \
.config("spark.cores.max","4") \
.getOrCreate()
#Get the Spark Context from Spark Session
MySparkContext = MySparkSession.sparkContext
MySparkContext
MySparkContext.getConf().getAll()
transactionDF = MySparkSession.read.csv("./transactions-asignment(2).csv",header=True,inferSchema=True)
customerDF = MySparkSession.read.csv("./customer-data-asignment(2).csv",header=True,inferSchema=True)
countryDF = MySparkSession.read.csv("./country-vat(3).csv",header=True,inferSchema=True)
transactionDF.show(10)
customerDF.show(10)
countryDF.show(10)
First, we should generate two new columns for year and month for transactionDF
from pyspark.sql import functions as f
transactionDF = transactionDF.withColumn('Year',f.split(transactionDF.Date,'-')[0]) \
.withColumn('Month',f.split(transactionDF.Date,'-')[1])
transactionDF.show(10)
from pyspark.sql.functions import col
marketingDF1 = transactionDF.groupby('country','group','Month') \
.agg(f.count('TransactionID').alias('Number_of_Transactions'),f.sum('Amount').alias('Total_Month'),f.avg('Amount').alias('Average_Month'))
marketingDF2 = transactionDF.groupby('country','group','Month','Product') \
.agg(f.count('TransactionID').alias('Number_of_Transactions'),f.sum('Amount').alias('Total_Month'),f.avg('Amount').alias('Average_Month'))
marketingDF1.show(10)
marketingDF2.show(10)
print('Number of records for first DataFrame: ', marketingDF1.count())
print('Number of records for second DataFrame: ', marketingDF2.count())
# Country|Month| Group| TotalMonth|AvgMonth|Count|
marketingDF1.createOrReplaceTempView('Table1')
df2= MySparkSession.sql("Select * from Table1 where group='Restaurant' AND Total_Month=(Select max(Total_Month) from Table1 where group='Restaurant')")
df2.show()
# Country|Month| Group|Product|TotalMonth|AvgMonth|Count|
marketingDF2.createOrReplaceTempView('Table2')
df3=MySparkSession.sql("select * from Table2 where group='Gambling' AND Total_Month=(Select max(Total_Month) from Table2 where group='Gambling')")
df3.show()
# Your code here
df4= transactionDF.withColumnRenamed('Country','CountryName')
df5= df4.join(countryDF,df4.CountryName==countryDF.Country,'left') \
.drop('Country')
df_VAT=df5.withColumn('VAT_Amount',df5.Amount-(df5.Amount/(1+(df5.VAT/100)))) \
.groupBy('CountryName') \
.agg(f.round(f.sum('VAT_Amount'),2).alias('VAT'))
print('The country we have to pay the biggest VAT to is: ',df_VAT.orderBy(col('VAT').desc()).first())
print('The country we have to pay the smallest VAT to is: ',df_VAT.orderBy(col('VAT').asc()).first())
# Your code here
transactionDF.createOrReplaceTempView('Table3')
df6=MySparkSession.sql("Select * from Table3 where PaymentType != 'Cash'")
df7= df6.withColumnRenamed('Country','CountryName')
df8= df7.join(countryDF,df7.CountryName==countryDF.Country,'left') \
.drop('Country') \
.groupby('PaymentType') \
.agg(f.format_number(f.sum('Amount'),2).alias('Total_Amount'))
df8.show(20)
# Your code here
df9= transactionDF.withColumnRenamed('CustomerID','Customer_ID')
df10= df9.join(customerDF,df9.Customer_ID==customerDF.CustomerID,'left')\
.drop('Customer_ID') \
.groupby('CustomerID','CardNumber')\
.agg(f.sum('Amount').alias('Amount'))\
.sort(col("Amount").desc())
df10.show(1)
# Your code here
transactionDF.createOrReplaceTempView('Table4')
df11= MySparkSession.sql("Select * from Table4 where Country = 'USA'")
df12= MySparkSession.sql("Select * from Table4 where Country != 'USA'")
df13= df11.groupby('country','PaymentType') \
.agg(f.sum('Amount').alias('TotalIncome'))
df14= df12.groupby('PaymentType')\
.agg(f.sum('Amount').alias('Total_Income_From_Outside_USA'))
df13.show()
df14.show()
# Your code here
df15= df14.withColumnRenamed("PaymentType","Payment_Type")
df16= df15.join(df13,df15.Payment_Type==df13.PaymentType,'left')\
.drop('country','PaymentType')
df17 = df16.withColumn('Ratio',(df16.TotalIncome/df16.Total_Income_From_Outside_USA)*100)
df17.show()
MySparkContext.stop()