scala 日志处理实例

日志格式

100.97.15.241 - - [19/Aug/2016:11:05:47 +0800] “GET /view.php HTTP/1.0” 200 0 “http://www.gooogle.cn/search?q=hadoop" “Mozilla/5.0 (compatible;MSIE 10.0;Windows NT 6.2; Trident/6.0” “-“

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
// 1. 总PV页面浏览量    
lines.count().print()

// 2. 各IP的PV,按PV倒序
// 空格分隔的第一个字段就是IP
lines.map(line => {(line.split(" ")(0), 1)}).reduceByKey(_ + _).transform(rdd => {
rdd.map(ip_pv => (ip_pv._2, ip_pv._1)).
sortByKey(false).
map(ip_pv => (ip_pv._2, ip_pv._1))
}).print()

// 3. 搜索引擎PV
val refer = lines.map(_.split("\"")(3))

// 先输出搜索引擎和查询关键词,避免统计搜索关键词时重复计算
// 输出(host, query_keys)
val searchEnginInfo = refer.map(r => {

val f = r.split('/')
//搜索关键字的key
val searchEngines = Map(
"www.google.cn" -> "q",
"www.yahoo.com" -> "p",
"cn.bing.com" -> "q",
"www.baidu.com" -> "wd",
"www.sogou.com" -> "query"
)

if (f.length > 2) {
val host = f(2)

if (searchEngines.contains(host)) {
val query = r.split('?')(1)
if (query.length > 0) {
val arr_search_q = query.split('&').filter(_.indexOf(searchEngines(host)+"=") == 0)
if (arr_search_q.length > 0)
(host, arr_search_q(0).split('=')(1))
else
(host, "")
} else {
(host, "")
}
} else
("", "")
} else
("", "")

})

// 输出搜索引擎PV
searchEnginInfo.filter(_._1.length > 0).map(p => {(p._1, 1)}).reduceByKey(_ + _).print()


// 4. 关键词PV
searchEnginInfo.filter(_._2.length > 0).map(p => {(p._2, 1)}).reduceByKey(_ + _).print()


// 5. 终端类型PV
lines.map(_.split("\"")(5)).map(agent => {
val types = Seq("iPhone", "Android")
var r = "Default"
for (t <- types) {
if (agent.indexOf(t) != -1)
r = t
}
(r, 1)
}).reduceByKey(_ + _).print()


// 6. 各页面PV
lines.map(line => {(line.split("\"")(1).split(" ")(1), 1)}).reduceByKey(_ + _).print()