ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

A small instance of visual analytics basing Spark(Python)

2021-06-17 14:04:03  阅读:248  来源: 互联网

标签:instance Python delays airlines delay analytics airline csv row


A small instance of visual analytics basing Spark(Python)

The total delay time of the major airlines in a certain month


1.Preparation

1.1.Data

      This data set was downloaded from the U.S. Department of Transportation, Office of the Secretary of Research on November 30, 2014 and represents flight data for the domestic United States in April of 2014.

      The following CSV files are lookup tables:

    - airlines.csv

    - airports.csv

      And provided detailed information about the reference codes in the main data set. These files have header rows to identify their fields.

      The flights.csv contains flight statistics for April 2014 with the following fields:

- flight date     (yyyy-mm-dd)

- airline id      (lookup in airlines.csv)

- flight num

- origin          (lookup in airports.csv)

- destination     (lookup in airports.csv)

- departure time  (HHMM)

- departure delay (minutes)

- arrival time    (HHMM)

- arrival delay   (minutes)

- air time        (minutes)

- distance        (miles)

 

1.2.Codes—a basic template

     A basic template for writing a Spark application in Python is as follows:

## Spark Application - execute with spark-submit

## Imports

from pyspark import SparkConf, SparkContext

## Module Constants

APP_NAME = "My Spark Application"

## Closure Functions

## Main functionality

def main(sc):

    pass

if __name__ == "__main__":

    # Configure Spark

    conf = SparkConf().setAppName(APP_NAME)

    conf = conf.setMaster("local[*]")

    sc   = SparkContext(conf=conf)

    # Execute Main functionality

    main(sc)


 

1.3. Codes—a basic template

The entire app is as follows:

 
## Spark Application - execute with spark-submit
## Imports
import csv
import matplotlib.pyplot as plt
from StringIO import StringIO
from datetime import datetime
from collections import namedtuple
from operator import add, itemgetter
from pyspark import SparkConf, SparkContext
## Module Constants
APP_NAME = "Flight Delay Analysis"
DATE_FMT = "%Y-%m-%d"
TIME_FMT = "%H%M"
fields   = ('date', 'airline', 'flightnum', 'origin', 'dest', 'dep',
            'dep_delay', 'arv', 'arv_delay', 'airtime', 'distance')
Flight   = namedtuple('Flight', fields)
## Closure Functions
def parse(row):
    """
    Parses a row and returns a named tuple.
    """
    row[0]  = datetime.strptime(row[0], DATE_FMT).date()
    row[5]  = datetime.strptime(row[5], TIME_FMT).time()
    row[6]  = float(row[6])
    row[7]  = datetime.strptime(row[7], TIME_FMT).time()
    row[8]  = float(row[8])
    row[9]  = float(row[9])
    row[10] = float(row[10])
    return Flight(*row[:11])
def split(line):
    """
    Operator function for splitting a line with csv module
    """
    reader = csv.reader(StringIO(line))
    return reader.next()
def plot(delays):
    """
    Show a bar chart of the total delay per airline
    """
    airlines = [d[0] for d in delays]
    minutes  = [d[1] for d in delays]
    index    = list(xrange(len(airlines)))
    fig, axe = plt.subplots()
    bars = axe.barh(index, minutes)
    # Add the total minutes to the right
    for idx, air, min in zip(index, airlines, minutes):
        if min > 0:
            bars[idx].set_color('#d9230f')
            axe.annotate(" %0.0f min" % min, xy=(min+1, idx+0.5), va='center')
        else:
            bars[idx].set_color('#469408')
            axe.annotate(" %0.0f min" % min, xy=(10, idx+0.5), va='center')
    # Set the ticks
    ticks = plt.yticks([idx+ 0.5 for idx in index], airlines)
    xt = plt.xticks()[0]
    plt.xticks(xt, [' '] * len(xt))
    # minimize chart junk
    plt.grid(axis = 'x', color ='white', linestyle='-')
    plt.title('Total Minutes Delayed per Airline')
    plt.show()
## Main functionality
def main(sc):
    # Load the airlines lookup dictionary
    airlines = dict(sc.textFile("ontime/airlines.csv").map(split).collect())
    # Broadcast the lookup dictionary to the cluster
    airline_lookup = sc.broadcast(airlines)
    # Read the CSV Data into an RDD
    flights = sc.textFile("ontime/flights.csv").map(split).map(parse)
    # Map the total delay to the airline (joined using the broadcast value)
    delays  = flights.map(lambda f: (airline_lookup.value[f.airline],
                                     add(f.dep_delay, f.arv_delay)))
    # Reduce the total delay for the month to the airline
    delays  = delays.reduceByKey(add).collect()
    delays  = sorted(delays, key=itemgetter(1))
    # Provide output from the driver
    for d in delays:
        print "%0.0f minutes delayed\t%s" % (d[1], d[0])
    # Show a bar chart of the delays
    plot(delays)
if __name__ == "__main__":
    # Configure Spark
    conf = SparkConf().setMaster("local[*]")
    conf = conf.setAppName(APP_NAME)
    sc   = SparkContext(conf=conf)
    # Execute Main functionality
    main(sc)

 

 

1.4. equipments

     A Ubuntu computer with spark、jdk、Scala

 

2.Steps

2.1.overview

data

codes

 

2.2. use the spark-submit command as follows:

 

2.3.result

2.4.Analysis

       what is this code doing? Let's look particularly at the main function which does the work most directly related to Spark. First, we load up a CSV file into an RDD, then map the split function to it. The split function parses each line of text using the csvmodule and returns a tuple that represents the row. Finally we pass the collect action to the RDD, which brings the data from the RDD back to the driver as a Python list. In this case, airlines.csv is a small jump table that will allow us to join airline codes with the airline full name. We will store this jump table as a Python dictionary and then broadcast it to every node in the cluster using sc.broadcast.

       Next, the main function loads the much larger flights.csv. After splitting the CSV rows, we map the parse function to the CSV row, which converts dates and times to Python dates and times, and casts floating point numbers appropriately. It also stores the row as a NamedTuple called Flight for efficient ease of use.

       With an RDD of Flight objects in hand, we map an anonymous function that transforms the RDD to a series of key-value pairs where the key is the name of the airline and the value is the sum of the arrival and departure delays. Each airline has its delay summed together using the reduceByKey action and the add operator, and this RDD is collected back to the driver (again the number airlines in the data is relatively small). Finally the delays are sorted in ascending order, then the output is printed to the console as well as visualized using matplotlib.

3.Q&A

3.1.ImportError: No module named matplotlib.pyplot

http://www.codeweblog.com/importerror-no-module-named-matplotlib-pyplot/

 

 

Note:

this demo came initially from the website

https://districtdatalabs.silvrback.com/getting-started-with-spark-in-python

I also find the chinese version

http://blog.jobbole.com/86232/

data come from github

https://github.com/bbengfort/hadoop-fundamentals/blob/master/data/ontime.zip



标签:instance,Python,delays,airlines,delay,analytics,airline,csv,row
来源: https://blog.51cto.com/u_15077160/2915321

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有