Lab 10 - Assignment 2

At the end of the year there are many departments of the company that request different data, both to perform analytics and to generate external reports.

The Data and Business Intelligence director has called his Data Engineering team and has listed the Dataset that they have to generate to respond to the different requests they have had.

To carry out the work, they have different text files ingested from the different databases of the company's management systems. These are:

  1. "transactions-asignment.csv". It is the file with all the transactions for year 2018 with columns: TransactionID, CustomerID, Date, Product, Group, Amount, PaymentType, Country.
  2. "customer-data-asignment.csv". It is the file with columns: CustomerID, CardNumber. It contains the relationship between the customer and his credit card identity.
  3. "country-vat.csv". It is the file with columns: VAT, Country. It contains the relationship between each Country and the Value Added Tax which is applied for the products.
  4. The products are labeled by a tag: A to J.
  5. The groups contains: 'Food', 'Leisure', 'Restaurant', 'Gym', 'Gambling', 'Travel', 'Learning'.
  6. The payment type contains: 'Visa', 'Cash', 'American', 'PayPal', 'Mastercard', 'Check'.
  7. The countries where the transactions come from are: 'Italy', 'France', 'Germany', 'USA', 'Brasil', 'UK', 'Switzerland', 'Sweden', 'Denmark', 'Canada', 'Mexico', 'Russia'.

The requested data reports are:

  1. The Product Marketing Department wants to analyze the monthly transactions by country respect to Group and respect to Product inside each Group. For this reason it need two DataFrames: one grouped by Group and other grouped by Product inside each Group. The columns for the first DataFrame should be: Country, Month, Group, TotalMonth, AvgMonth, Number of Transactions. And for the second one should be: Country, Month, Group, Product, TotalMonth, AvgMonth, Number of Transactions. How many records does each DataFrame?
  2. Could we say to Marketing Department which is the country and the month with the highest total month amount for Restaurant Group? How many transactions have been made for that Country and month?
  3. Could we say to Marketing Department which is the Product with the highest total month amount for Gambling Group and which is the Country and the month for that highest sale?
  4. The Financial Department needs to liquidate the VAT (the Amount for each transaction is gross, VAT for the country is included) for each country with the tax authority. Could we say to Financial Department which is the Country we should pay the bigest VAT amount and the amount? And which is the Country we should pay the smaller one and the amount?
  5. Also they need to know the total amount for type of payment (except the payments in cash) to liquidate with their own bank. Could we say to Financial Department which is the amount paid for every payment type?
  6. Who is our best customer and what is his/her credit card number?
  7. Calculate the total income coming from USA by type of payment and from outside USA.
  8. Which is the relationship between the income from USA and from outside USA for every type of payment.

Hint for 4: For the first Dataset about liquidate VAT you first shoul join the transactionsDF with countryVATDF, create a new column with the corresponding VAT for each transactions and after that to group by country and to agregate the sum on the new created column.

For the second Dataset we should first filter by payment type different than 'Cash' and after that to group by payment type and aggregate the sum on Amount.

In [31]:
import os
import sys

os.environ['SPARK_HOME'] = "/Users/furqan/Downloads/spark-2.3.2-bin-hadoop2.7"

# Create a variable for our root path
SPARK_HOME = os.environ['SPARK_HOME']

#Add the following paths to the system path. Please check your installation
#to make sure that these zip files actually exist. The names might change
#as versions change.
sys.path.insert(0,os.path.join(SPARK_HOME,"python"))
sys.path.insert(0,os.path.join(SPARK_HOME,"python","lib"))
sys.path.insert(0,os.path.join(SPARK_HOME,"python","lib","pyspark.zip"))
sys.path.insert(0,os.path.join(SPARK_HOME,"python","lib","py4j-0.10.7-src.zip"))

#Initialize SparkSession and SparkContext
from pyspark.sql import SparkSession
#from pyspark import SparkContext

#Create a Spark Session
MySparkSession = SparkSession \
    .builder \
    .master("local[2]") \
    .appName("MiPrimer") \
    .config("spark.executor.memory", "6g") \
    .config("spark.cores.max","4") \
    .getOrCreate()


#Get the Spark Context from Spark Session    
MySparkContext = MySparkSession.sparkContext
In [32]:
MySparkContext
Out[32]:

SparkContext

Spark UI

Version
v2.3.2
Master
local[2]
AppName
MiPrimer
In [33]:
MySparkContext.getConf().getAll()
Out[33]:
[('spark.executor.memory', '6g'),
 ('spark.app.name', 'MiPrimer'),
 ('spark.app.id', 'local-1551284444798'),
 ('spark.rdd.compress', 'True'),
 ('spark.driver.host', '10.4.89.252'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client'),
 ('spark.driver.port', '65511'),
 ('spark.ui.showConsoleProgress', 'true'),
 ('spark.master', 'local[2]'),
 ('spark.cores.max', '4')]

1. Creating the DataFrames

In [34]:
transactionDF = MySparkSession.read.csv("./transactions-asignment(2).csv",header=True,inferSchema=True)

customerDF = MySparkSession.read.csv("./customer-data-asignment(2).csv",header=True,inferSchema=True)

countryDF = MySparkSession.read.csv("./country-vat(3).csv",header=True,inferSchema=True)

transactionDF.show(10)
customerDF.show(10)
countryDF.show(10)
+--------------------+----------+-------------------+-------+----------+------+-----------+-----------+
|       TransactionID|CustomerID|               Date|Product|     Group|Amount|PaymentType|    Country|
+--------------------+----------+-------------------+-------+----------+------+-----------+-----------+
|7564e71a-3050-11e...|      4255|2018-07-18 02:36:00|      I|       Gym| 231.0|   American|     Sweden|
|7565ad98-3050-11e...|     30514|2018-03-08 07:37:00|      D|       Gym|  59.7|      Check|     Mexico|
|7565ad99-3050-11e...|      3853|2018-12-04 10:25:00|      B|    Travel|3782.0|       Cash|     Mexico|
|7565ad9a-3050-11e...|     49729|2018-04-12 19:23:00|      I|  Gambling| 231.0|      Check|      Italy|
|7565ad9b-3050-11e...|     33467|2018-02-01 23:17:00|      G|      Food|289.26| Mastercard|     Sweden|
|7565ad9c-3050-11e...|     45120|2018-09-08 11:21:00|      I|Restaurant|  23.1|      Check|         UK|
|7565ad9d-3050-11e...|      9965|2018-11-06 08:27:00|      C|    Travel|2590.0|      Check|Switzerland|
|7565ad9e-3050-11e...|     20404|2018-08-05 00:47:00|      J|   Leisure| 12.93| Mastercard|    Denmark|
|7565ad9f-3050-11e...|     46043|2018-08-04 05:07:00|      A|       Gym| 234.0|       Visa|     Brasil|
|7565ada0-3050-11e...|      7053|2018-02-23 07:59:00|      E|       Gym| 93.47|       Visa|    Denmark|
+--------------------+----------+-------------------+-------+----------+------+-----------+-----------+
only showing top 10 rows

+----------+--------------+
|CustomerID|    CardNumber|
+----------+--------------+
|         0|15056453071315|
|         1|15056453588315|
|         2|15056453027428|
|         3|15056453920562|
|         4|15056453139712|
|         5|15056453728136|
|         6|15056453811382|
|         7|15056453319623|
|         8|15056453773603|
|         9|15056453113544|
+----------+--------------+
only showing top 10 rows

+---+-----------+
|VAT|    Country|
+---+-----------+
| 21|      Italy|
| 19|     France|
| 18|    Germany|
| 20|        USA|
| 18|     Brasil|
| 18|         UK|
| 18|Switzerland|
| 18|     Sweden|
| 19|    Denmark|
| 21|     Canada|
+---+-----------+
only showing top 10 rows

First, we should generate two new columns for year and month for transactionDF

In [35]:
from pyspark.sql import functions as f

transactionDF = transactionDF.withColumn('Year',f.split(transactionDF.Date,'-')[0]) \
                            .withColumn('Month',f.split(transactionDF.Date,'-')[1])
transactionDF.show(10)
+--------------------+----------+-------------------+-------+----------+------+-----------+-----------+----+-----+
|       TransactionID|CustomerID|               Date|Product|     Group|Amount|PaymentType|    Country|Year|Month|
+--------------------+----------+-------------------+-------+----------+------+-----------+-----------+----+-----+
|7564e71a-3050-11e...|      4255|2018-07-18 02:36:00|      I|       Gym| 231.0|   American|     Sweden|2018|   07|
|7565ad98-3050-11e...|     30514|2018-03-08 07:37:00|      D|       Gym|  59.7|      Check|     Mexico|2018|   03|
|7565ad99-3050-11e...|      3853|2018-12-04 10:25:00|      B|    Travel|3782.0|       Cash|     Mexico|2018|   12|
|7565ad9a-3050-11e...|     49729|2018-04-12 19:23:00|      I|  Gambling| 231.0|      Check|      Italy|2018|   04|
|7565ad9b-3050-11e...|     33467|2018-02-01 23:17:00|      G|      Food|289.26| Mastercard|     Sweden|2018|   02|
|7565ad9c-3050-11e...|     45120|2018-09-08 11:21:00|      I|Restaurant|  23.1|      Check|         UK|2018|   09|
|7565ad9d-3050-11e...|      9965|2018-11-06 08:27:00|      C|    Travel|2590.0|      Check|Switzerland|2018|   11|
|7565ad9e-3050-11e...|     20404|2018-08-05 00:47:00|      J|   Leisure| 12.93| Mastercard|    Denmark|2018|   08|
|7565ad9f-3050-11e...|     46043|2018-08-04 05:07:00|      A|       Gym| 234.0|       Visa|     Brasil|2018|   08|
|7565ada0-3050-11e...|      7053|2018-02-23 07:59:00|      E|       Gym| 93.47|       Visa|    Denmark|2018|   02|
+--------------------+----------+-------------------+-------+----------+------+-----------+-----------+----+-----+
only showing top 10 rows

1. Datasets for Product Marketing Depatment

1.a Build the DataFrames

In [36]:
from pyspark.sql.functions import col

marketingDF1 = transactionDF.groupby('country','group','Month') \
                            .agg(f.count('TransactionID').alias('Number_of_Transactions'),f.sum('Amount').alias('Total_Month'),f.avg('Amount').alias('Average_Month'))
marketingDF2 = transactionDF.groupby('country','group','Month','Product') \
                            .agg(f.count('TransactionID').alias('Number_of_Transactions'),f.sum('Amount').alias('Total_Month'),f.avg('Amount').alias('Average_Month'))
In [37]:
marketingDF1.show(10)
marketingDF2.show(10)
+-----------+----------+-----+----------------------+------------------+------------------+
|    country|     group|Month|Number_of_Transactions|       Total_Month|     Average_Month|
+-----------+----------+-----+----------------------+------------------+------------------+
|    Germany|  Gambling|   10|                  1016|346706.39999999956|341.24645669291294|
|     Sweden|      Food|   10|                   937|248346.08999999898| 265.0438527214504|
|    Denmark|Restaurant|   10|                   989| 105503.5499999999|106.67699696663286|
|     Brasil|       Gym|   06|                   992|178634.63000000056|180.07523185483927|
|      Italy|  Learning|   09|                   954|          194822.0|204.21593291404614|
|        USA|       Gym|   05|                   982|181688.26000000053|185.01859470468486|
|     Russia|      Food|   08|                   968|257216.28999999881|265.71930785123845|
|     France|   Leisure|   10|                  1014| 41050.00000000013| 40.48323471400408|
|Switzerland|   Leisure|   06|                  1030| 43207.61000000009|41.949135922330186|
|      Italy|  Gambling|   01|                   991| 332869.9999999996| 335.8930373360238|
+-----------+----------+-----+----------------------+------------------+------------------+
only showing top 10 rows

+-----------+----------+-----+-------+----------------------+-----------------+-----------------+
|    country|     group|Month|Product|Number_of_Transactions|      Total_Month|    Average_Month|
+-----------+----------+-----+-------+----------------------+-----------------+-----------------+
|    Germany|   Leisure|   02|      F|                   100|4532.999999999997|45.32999999999997|
|     Russia|      Food|   12|      B|                   101|48298.20000000004|478.2000000000004|
|      Italy|Restaurant|   03|      C|                   105|          27195.0|            259.0|
|     Canada|       Gym|   05|      G|                   109|          31501.0|            289.0|
|     Mexico|  Gambling|   01|      G|                   108|          31212.0|            289.0|
|Switzerland|  Learning|   11|      B|                    87|          30189.0|            347.0|
|      Italy|    Travel|   06|      I|                   126|         291060.0|           2310.0|
|     France|    Travel|   12|      D|                   102|          60894.0|            597.0|
|        USA|   Leisure|   09|      G|                   108|3121.200000000002|28.90000000000002|
|     Brasil|    Travel|   05|      B|                   107|         404674.0|           3782.0|
+-----------+----------+-----+-------+----------------------+-----------------+-----------------+
only showing top 10 rows

1.b Number of records

In [38]:
print('Number of records for first DataFrame: ', marketingDF1.count())
print('Number of records for second DataFrame: ', marketingDF2.count())
Number of records for first DataFrame:  1008
Number of records for second DataFrame:  10080

2. Response to what is the country and the month with the highest "total month amount" for Group Restaurant

In [39]:
#    Country|Month|     Group|        TotalMonth|AvgMonth|Count|
marketingDF1.createOrReplaceTempView('Table1')
df2= MySparkSession.sql("Select * from Table1 where group='Restaurant' AND Total_Month=(Select max(Total_Month) from Table1 where group='Restaurant')")
df2.show()
+-------+----------+-----+----------------------+------------------+------------------+
|country|     group|Month|Number_of_Transactions|       Total_Month|     Average_Month|
+-------+----------+-----+----------------------+------------------+------------------+
|  Italy|Restaurant|   04|                  1059|118536.14999999997|111.93215297450422|
+-------+----------+-----+----------------------+------------------+------------------+

3. Response to what is the Product with the highest sale about Gambling and what is the Country and the month

In [40]:
#     Country|Month|   Group|Product|TotalMonth|AvgMonth|Count|
marketingDF2.createOrReplaceTempView('Table2')
df3=MySparkSession.sql("select * from Table2 where group='Gambling' AND Total_Month=(Select max(Total_Month) from Table2 where group='Gambling')")
df3.show()
+-------+--------+-----+-------+----------------------+-----------+-------------+
|country|   group|Month|Product|Number_of_Transactions|Total_Month|Average_Month|
+-------+--------+-----+-------+----------------------+-----------+-------------+
| Russia|Gambling|   10|      D|                   120|    71640.0|        597.0|
+-------+--------+-----+-------+----------------------+-----------+-------------+

4. Response to VAT to liquidate for each country

In [41]:
# Your code here

df4= transactionDF.withColumnRenamed('Country','CountryName')
df5= df4.join(countryDF,df4.CountryName==countryDF.Country,'left') \
        .drop('Country') 
df_VAT=df5.withColumn('VAT_Amount',df5.Amount-(df5.Amount/(1+(df5.VAT/100)))) \
          .groupBy('CountryName') \
          .agg(f.round(f.sum('VAT_Amount'),2).alias('VAT')) 
          
print('The country we have to pay the biggest VAT to is: ',df_VAT.orderBy(col('VAT').desc()).first())
print('The country we have to pay the smallest VAT to is: ',df_VAT.orderBy(col('VAT').asc()).first())
The country we have to pay the biggest VAT to is:  Row(CountryName='Russia', VAT=7968494.52)
The country we have to pay the smallest VAT to is:  Row(CountryName='UK', VAT=6947854.91)

5. Response to total amount for type of payment (except the payments in cash) to liquidate with their own bank

In [42]:
# Your code here
transactionDF.createOrReplaceTempView('Table3')
df6=MySparkSession.sql("Select * from Table3 where PaymentType != 'Cash'")

df7= df6.withColumnRenamed('Country','CountryName')
df8= df7.join(countryDF,df7.CountryName==countryDF.Country,'left') \
        .drop('Country') \
        .groupby('PaymentType') \
        .agg(f.format_number(f.sum('Amount'),2).alias('Total_Amount'))
df8.show(20)
+-----------+-------------+
|PaymentType| Total_Amount|
+-----------+-------------+
|       Visa|91,727,292.28|
|      Check|91,842,289.77|
| Mastercard|91,343,170.82|
|     PayPal|90,871,507.94|
|   American|90,893,043.24|
+-----------+-------------+

6. Response to Who is our best customer and what is his/her credit card number

In [43]:
# Your code here
df9= transactionDF.withColumnRenamed('CustomerID','Customer_ID')
df10= df9.join(customerDF,df9.Customer_ID==customerDF.CustomerID,'left')\
                   .drop('Customer_ID') \
                   .groupby('CustomerID','CardNumber')\
                   .agg(f.sum('Amount').alias('Amount'))\
                   .sort(col("Amount").desc())
df10.show(1)
+----------+--------------+--------+
|CustomerID|    CardNumber|  Amount|
+----------+--------------+--------+
|     14575|15056453501289|40981.03|
+----------+--------------+--------+
only showing top 1 row

7. Calculate the total income coming from USA by type of payment

In [44]:
# Your code here
transactionDF.createOrReplaceTempView('Table4')
df11= MySparkSession.sql("Select * from Table4 where Country = 'USA'")
df12= MySparkSession.sql("Select * from Table4 where Country != 'USA'")

df13= df11.groupby('country','PaymentType') \
          .agg(f.sum('Amount').alias('TotalIncome'))
df14= df12.groupby('PaymentType')\
                   .agg(f.sum('Amount').alias('Total_Income_From_Outside_USA'))

df13.show()
df14.show()
+-------+-----------+-----------------+
|country|PaymentType|      TotalIncome|
+-------+-----------+-----------------+
|    USA|     PayPal|7415358.010000042|
|    USA|       Cash|7636967.960000036|
|    USA| Mastercard| 7750294.77000005|
|    USA|   American|7497234.550000055|
|    USA|       Visa|7837635.690000074|
|    USA|      Check| 7827298.36000004|
+-------+-----------+-----------------+

+-----------+-----------------------------+
|PaymentType|Total_Income_From_Outside_USA|
+-----------+-----------------------------+
|       Visa|          8.388965658998962E7|
|      Check|          8.401499140998971E7|
| Mastercard|          8.359287604999016E7|
|     PayPal|          8.345614992998981E7|
|       Cash|          8.382591183998904E7|
|   American|           8.33958086899902E7|
+-----------+-----------------------------+

8. Which is the relationship between the income from USA and from outside USA for every type of payment.

In [45]:
# Your code here
df15= df14.withColumnRenamed("PaymentType","Payment_Type")
df16= df15.join(df13,df15.Payment_Type==df13.PaymentType,'left')\
          .drop('country','PaymentType')
df17 = df16.withColumn('Ratio',(df16.TotalIncome/df16.Total_Income_From_Outside_USA)*100) 
df17.show()
+------------+-----------------------------+-----------------+-----------------+
|Payment_Type|Total_Income_From_Outside_USA|      TotalIncome|            Ratio|
+------------+-----------------------------+-----------------+-----------------+
|        Visa|          8.388965658998962E7|7837635.690000074|9.342791481799107|
|       Check|          8.401499140998971E7| 7827298.36000004| 9.31654961648826|
|  Mastercard|          8.359287604999016E7| 7750294.77000005|9.271477590225778|
|      PayPal|          8.345614992998981E7|7415358.010000042|8.885334413606044|
|        Cash|          8.382591183998904E7|7636967.960000036|9.110509855923608|
|    American|           8.33958086899902E7|7497234.550000055|8.989941662259977|
+------------+-----------------------------+-----------------+-----------------+

In [ ]:
 
In [ ]:
 
In [ ]:
 
In [ ]:
 
In [46]:
MySparkContext.stop()
In [ ]: