-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathstream_app.py
134 lines (101 loc) · 4.84 KB
/
stream_app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
from pyspark import SparkConf, SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import Row, SQLContext
import sys
import requests
from socket import *
import pandas as pd
def aggregate_tags_count(new_values, total_sum):
return sum(new_values) + (total_sum or 0)
def get_sql_context_instance(spark_context):
if ('sqlContextSingletonInstance' not in globals()):
globals()['sqlContextSingletonInstance'] = SQLContext(spark_context)
return globals()['sqlContextSingletonInstance']
def process_rdd(time, rdd):
print("----------- %s -----------" % str(time))
try:
# Get spark sql singleton context from the current context
sql_context = get_sql_context_instance(rdd.context)
print("Get spark sql singleton context from the current context ----------- %s -----------" % str(time))
# convert the RDD to Row RDD
row_rdd = rdd.map(lambda w: Row(word=w[0], word_count=w[1]))
# create a DF from the Row RDD
hashtags_df = sql_context.createDataFrame(row_rdd)
# Register the dataframe as table
hashtags_df.registerTempTable("hashtags")
hashtag_counts_df = sql_context.sql(
"select word , word_count from hashtags where word like '#%'order by word_count desc limit 10")
hashtag_counts_df.show()
print("show table 2")
hashtag_counts_df.coalesce(1).write.format('com.databricks.spark.csv').mode('overwrite').option("header",
"true").save(
"hashtag_file.csv")
country_counts_df = sql_context.sql(
"select word as country_code, word_count as tweet_count from hashtags where word like 'CC%'order by word_count desc limit 10")
country_counts_df.show()
country_counts_df.coalesce(1).write.format('com.databricks.spark.csv').mode('overwrite').option("header",
"true").save(
"country_file.csv")
device_df = sql_context.sql(
"select word as device, word_count as device_count from hashtags where word like 'TS%'order by word_count desc limit 10")
device_df.show()
device_df.coalesce(1).write.format('com.databricks.spark.csv').mode('overwrite').option("header", "true").save(
"device_file.csv")
except:
pass
def process_rdd1(time, rdd):
print("----------- %s -----------" % str(time))
try:
# Get spark sql singleton context from the current context
sql_context = get_sql_context_instance(rdd.context)
print("Get spark sql singleton context from the current context -----number------ %s -----------" % str(time))
# convert the RDD to Row RDD
row_rdd = rdd.map(lambda w: Row(word=w[0], word_count=w[1]))
# create a DF from the Row RDD
hashtags_df = sql_context.createDataFrame(row_rdd)
# Register the dataframe as table
hashtags_df.registerTempTable("number")
hashtag_counts_df = sql_context.sql(
"select word as num from number")
hashtag_counts_df.show()
hashtag_counts_df.coalesce(1).write.format('com.databricks.spark.csv').mode('overwrite').option("header",
"true").csv(
"number_file.csv")
except:
pass
# create spark configuration
conf = SparkConf()
conf.setAppName("TwitterStreamApp")
# create spark context with the above configuration
sc = SparkContext(conf=conf)
sc.setLogLevel("ERROR")
# create the Streaming Context from the above spark context with interval size 2 seconds
ssc = StreamingContext(sc, 30)
# setting a checkpoint to allow RDD recovery
ssc.checkpoint("checkpoint_TwitterApp")
# read data from port 8080
num_permin = []
dataStream = ssc.socketTextStream('127.0.0.1', 8080)
dataStream1 = dataStream.countByWindow(30, 30)
# print(dataStream)
# print(dataStream.context())
print("Here!\n")
print("*********")
# split each tweet into words
words = dataStream.flatMap(lambda line: line.split(" "))
words1 = dataStream1.flatMap(lambda line: line.split(" "))
print("data!!!\n")
# dataStream1.pprint()
# filter the words to get only hashtags, then map each hashtag to be a pair of (hashtag,1)
hashtags = words.map(lambda x: (x, 1))
hashtags1 = dataStream1.map(lambda x: (x, 1))
# adding the count of each hashtag to its last count
tags_totals = hashtags.updateStateByKey(aggregate_tags_count)
tags_totals1= hashtags1.updateStateByKey(aggregate_tags_count)
# do processing for each RDD generated in each interval
tags_totals.foreachRDD(process_rdd)
tags_totals1.foreachRDD(process_rdd1)
# start the streaming computation
ssc.start()
# wait for the streaming to finish
ssc.awaitTermination()