不同api读取数据
- 1.SQLite
- 1.1 连接数据库
- 1.2 创建一个表
- 1.3 查询记录
- 2.mysql
- 2.1 数据库连接
- 2.2 创建数据库表
- 2.3 数据库插入操作
- 2.4 数据库查询操作
- 2.5 数据库更新操作
- 2.6 删除操作
- 2.7 执行事务
- 3.SQLAlchemy
- 3.1 架构
- 3.2 连接数据库
- 3.3 创建数据库
- 3.4 查询记录
- 4.PySpark
- 4.1 pyspark架构
- 4.2 RDD
- 4.2.1 报错问题
- 4.2.2 解决方法
- 1.下载
- 2.配置环境
- 4.3 DataFrame
- 4.4 Dataset
- 4.5 SparkSession和SparkContext
1.SQLite
SQLite是一种嵌入式数据库,它的数据库就是一个文件。由于SQLite本身是C写的。
在python中有内置的SQLite3,因此使用SQLite3是不需要安装任何东西,配置任何东西的。
在使用SQLite前,我们先要搞清楚几个概念:
表是数据库中存放关系数据的集合,一个数据库里面通常都包含多个表,比如学生的表,班级的表,学校的表,等等。表和表之间通过外键关联。
要操作关系数据库,首先需要连接到数据库,一个数据库连接称为Connection;
连接到数据库后,需要打开游标,称之为Cursor,通过Cursor执行SQL语句,然后,获得执行结果。
1.1 连接数据库
import sqlite3#如果不存在这个数据库,就会在当前目录下创建一个
conn = sqlite3.connect('test.db')
1.2 创建一个表
c = conn.cursor()#创建一个company表
c.execute('create table company (id varchar(20) primary key, name varchar(20))')
print "Table created successfully"
#插入一条记录
c.execute('insert into company (id,name) values (\'1\', \'Michael\')')
#关闭Cursor
c.close()
#提交事务
conn.commit()
#关闭connection
conn.close()
1.3 查询记录
conn = sqlite3.connect('test.db')cursor = conn.cursor()
# 执行查询语句:
cursor.execute('select * from user where id=?', ('1',))
# 获得查询结果集:
values = cursor.fetchall()
values
[('1', 'Michael')]
cursor.close()
conn.close()
2.mysql
MySQL是Web世界中使用最广泛的数据库服务器。SQLite的特点是轻量级、可嵌入,但不能承受高并发访问,适合桌面和移动应用。而MySQL是为服务器端设计的数据库,能承受高并发访问,同时占用的内存也远远大于SQLite。
在python也有对应的mysql api接口。pymysql包
pymysql跟sqllite操作类似,都是通过connect连接,创建操作游标cursor,执行sql语句execute。
2.1 数据库连接
import MySQLdb# 打开数据库连接
db = MySQLdb.connect("localhost", "testuser", "test123", "TESTDB", charset='utf8' )
# 使用cursor()方法获取操作游标
cursor = db.cursor()
# 使用execute方法执行SQL语句
cursor.execute("SELECT VERSION()")
# 使用 fetchone() 方法获取一条数据
data = cursor.fetchone()
print "Database version : %s " % data
# 关闭数据库连接
db.close()
2.2 创建数据库表
import MySQLdb# 打开数据库连接
db = MySQLdb.connect("localhost", "testuser", "test123", "TESTDB", charset='utf8' )
# 使用cursor()方法获取操作游标
cursor = db.cursor()
# 如果数据表已经存在使用 execute() 方法删除表。
cursor.execute("DROP TABLE IF EXISTS EMPLOYEE")
# 创建数据表SQL语句
sql = """CREATE TABLE EMPLOYEE (
FIRST_NAME CHAR(20) NOT NULL,
LAST_NAME CHAR(20),
AGE INT,
SEX CHAR(1),
INCOME FLOAT )"""
cursor.execute(sql)
# 关闭数据库连接
db.close()
2.3 数据库插入操作
import MySQLdb# 打开数据库连接
db = MySQLdb.connect("localhost", "testuser", "test123", "TESTDB", charset='utf8' )
# 使用cursor()方法获取操作游标
cursor = db.cursor()
# SQL 插入语句
sql = """INSERT INTO EMPLOYEE(FIRST_NAME,
LAST_NAME, AGE, SEX, INCOME)
VALUES ('Mac', 'Mohan', 20, 'M', 2000)"""
try:
# 执行sql语句
cursor.execute(sql)
# 提交到数据库执行
db.commit()
except:
# Rollback in case there is any error
db.rollback()
# 关闭数据库连接
db.close()
2.4 数据库查询操作
Python查询Mysql使用 fetchone() 方法获取单条数据, 使用fetchall() 方法获取多条数据。
- fetchone(): 该方法获取下一个查询结果集。结果集是一个对象
- fetchall():接收全部的返回结果行.
- rowcount:
这是一个只读属性,并返回执行execute()方法后影响的行数。
# 打开数据库连接
db = MySQLdb.connect("localhost", "testuser", "test123", "TESTDB", charset='utf8' )
# 使用cursor()方法获取操作游标
cursor = db.cursor()
# SQL 查询语句
sql = "SELECT * FROM EMPLOYEE \
WHERE INCOME > %s" % (1000)
try:
# 执行SQL语句
cursor.execute(sql)
# 获取所有记录列表
results = cursor.fetchall()
for row in results:
fname = row[0]
lname = row[1]
age = row[2]
sex = row[3]
income = row[4]
# 打印结果
print "fname=%s,lname=%s,age=%s,sex=%s,income=%s" % \
(fname, lname, age, sex, income )
except:
print "Error: unable to fecth data"
# 关闭数据库连接
db.close()
2.5 数据库更新操作
import MySQLdb# 打开数据库连接
db = MySQLdb.connect("localhost", "testuser", "test123", "TESTDB", charset='utf8' )
# 使用cursor()方法获取操作游标
cursor = db.cursor()
# SQL 更新语句
sql = "UPDATE EMPLOYEE SET AGE = AGE + 1 WHERE SEX = '%c'" % ('M')
try:
# 执行SQL语句
cursor.execute(sql)
# 提交到数据库执行
db.commit()
except:
# 发生错误时回滚
db.rollback()
# 关闭数据库连接
db.close()
2.6 删除操作
import MySQLdb# 打开数据库连接
db = MySQLdb.connect("localhost", "testuser", "test123", "TESTDB", charset='utf8' )
# 使用cursor()方法获取操作游标
cursor = db.cursor()
# SQL 删除语句
sql = "DELETE FROM EMPLOYEE WHERE AGE > %s" % (20)
try:
# 执行SQL语句
cursor.execute(sql)
# 提交修改
db.commit()
except:
# 发生错误时回滚
db.rollback()
# 关闭连接
db.close()
2.7 执行事务
# SQL删除记录语句sql = "DELETE FROM EMPLOYEE WHERE AGE > %s" % (20)
try:
# 执行SQL语句
cursor.execute(sql)
# 向数据库提交
db.commit()
except:
# 发生错误时回滚
db.rollback()
3.SQLAlchemy
SQLAlchemy 是 Python 中一个通过 ORM 操作数据库的框架。
SQLAlchemy对象关系映射器提供了一种方法,用于将用户定义的Python类与数据库表相关联,并将这些类(对象)的实例与其对应表中的行相关联。它包括一个透明地同步对象及其相关行之间状态的所有变化的系统,称为工作单元,以及根据用户定义的类及其定义的彼此之间的关系表达数据库查询的系统。
3.1 架构
3.2 连接数据库
连接 Mysql数据库
#!/usr/bin/env python# -*- coding:utf-8 -*-
from sqlalchemy import create_engine
engine = create_engine(
"mysql+pymysql://root:123@171.0.0.1:3306/dbname?charset=utf8mb4",
echo=True,
max_overflow=5)
echo 标志是设置SQLAlchemy日志记录的快捷方式。 启用它后,我们将看到所有生成的SQL。
Max_overflow 指定了连接池的最大连接数。
create_engine() 的返回值是一个实例引擎,它代表了一个数据库的核心接口。
3.3 创建数据库
from sqlalchemy import Column, String, create_enginefrom sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
# 创建对象的基类:
Base = declarative_base()
# 定义User对象:
class User(Base):
# 表的名字:
__tablename__ = 'user'
# 表的结构:
id = Column(String(20), primary_key=True)
name = Column(String(20))
# 初始化数据库连接:
engine = create_engine('mysql+mysqlconnector://root:password@localhost:3306/test')
# 创建DBSession类型:
DBSession = sessionmaker(bind=engine)
添加一个user对象:
# 创建session对象:session = DBSession()
# 创建新User对象:
new_user = User(id='5', name='Bob')
# 添加到session:
session.add(new_user)
# 提交即保存到数据库:
session.commit()
# 关闭session:
session.close()
3.4 查询记录
# 创建Session:session = DBSession()
# 创建Query查询,filter是where条件,最后调用one()返回唯一行,如果调用all()则返回所有行:
user = session.query(User).filter(User.id=='5').one()
# 打印类型和对象的name属性:
print('type:', type(user))
print('name:', user.name)
# 关闭Session:
session.close()
4.PySpark
4.1 pyspark架构
和数据分析相关性比较高的是spark sql+dataframe,简单可以看成是sql和pandas的dataframe的结合,不过语句要更麻烦一些。
MLlib,spark的机器学习库,以rdd为基础的处理单元,而后来诞生的ml则是以dataframe为处理单元,因此更加方便高效。
4.2 RDD
弹性分布式数据集,是不可变Java虚拟机(JVM)对象的分布式集合,我们在使用pyspark的过程中,Python数据是存放在这些JVM对象中的。
import pysparkfrom pyspark import SparkContext as sc
from pyspark import SparkConf
conf = SparkConf().setAppName('test').setMaster('local[*]')
sc = sc.getOrCreate(conf)
data=sc.parallelize(c=[3,1,2,5,5],numSlices=5)