import sys
def mapper(line): words = line.split() for word in words: yield (word, 1)
Reducer.py
import sys
def reducer(key, values): count = 0 for value in values: count += value yield (key, count)
Driver.py
import sys from Mapper import mapper from Reducer import reducer
if name == 'main': for line in sys.stdin: for word, value in mapper(line): sys.stdout.write(f'{word}\t{value}\n')
sys.stdin.seek(0)
for key, values in reducer(sys.stdin):
sys.stdout.write(f'{key}\t{values}\n')
复制代码
```
上述代码实例是一个简单的Word Count示例,通过MapReduce框架对文本数据举行词频统计。
4.2 Hadoop Distributed File System (HDFS)代码实例
```python
Driver.py
import os import hdfs
hdfs = hdfs.InsecureClient('http://localhost:50070', user='root')
def uploadfile(filepath, hdfspath): with open(filepath, 'rb') as f: hdfs.copyfromlocal(f, hdfspath)
def downloadfile(hdfspath, filepath): with open(filepath, 'wb') as f: hdfs.copyto(hdfs_path, f)
if name == 'main': uploadfile('data.txt', '/user/root/data.txt') downloadfile('/user/root/data.txt', 'data_downloaded.txt') ```
上述代码实例是一个简单的HDFS文件上传和下载示例,通过Hadoop Distributed File System (HDFS) API对本地文件举行上传和下载。
4.3 Apache Hive代码实例
```sql -- 创建数据表 CREATE TABLE if not exists users ( id INT, name STRING, age INT );
-- 插入数据 INSERT INTO TABLE users VALUES (1, 'Alice', 25); INSERT INTO TABLE users VALUES (2, 'Bob', 30); INSERT INTO TABLE users VALUES (3, 'Charlie', 35);
-- 查询数据 SELECT * FROM users WHERE age > 30; ```
上述代码实例是一个简单的Apache Hive示例,通过创建数据表、插入数据和查询数据来演示Hive的根本功能。
4.4 Apache Pig代码实例
```python
Driver.py
import os import pigpy
def loaddata(): return pigpy.Dataset('data.txt').splitby_line()
def filter_data(data): return data.filter(lambda line: line.find('Alice') != -1)
def groupdata(data): return data.groupby(key='name').aggregate(lambda x: x.count())
if name == 'main': data = loaddata() filtereddata = filterdata(data) groupeddata = groupdata(filtereddata) for row in grouped_data: print(row) ```
上述代码实例是一个简单的Apache Pig示例,通过加载数据、筛选数据和分组数据来演示Pig的根本功能。
5.将来发展趋势与挑衅