当前位置 : 主页 > 编程语言 > python >

Python从门到精通(五):文件处理-05-二进制处理

来源:互联网 收集:自由互联 发布时间:2022-06-27
用二进制的目的主要是为了节省空间,将一些数据转为二进制形式,有时还需要定入元组中。 一、将python元组写入到二进制文件中 def write_records ( record_list , format , f ): """ Write a sequenc

用二进制的目的主要是为了节省空间,将一些数据转为二进制形式,有时还需要定入元组中。

一、将python元组写入到二进制文件中

def write_records(record_list, format, f):
"""
Write a sequence of tuples to a binary file of structures.
:param record_list:
:param format:
:param f:
:return:
"""
record_struct = Struct(format)
for r in record_list:
f.write(record_struct.pack(*r))

# Example
if __name__ == '__main__':
records = [(1, 3.3, 7.5),
(5, 9.8, 11.0),
(16, 18.4, 66.7)]
with open('data.b', 'wb') as f:
write_records(records, '<idd', f)

二、读取二进制文件-块形式读取元组中

def write_records(record_list, format, f):
"""
Write a sequence of tuples to a binary file of structures.
:param record_list:
:param format:
:param f:
:return:
"""
record_struct = Struct(format)
for r in record_list:
f.write(record_struct.pack(*r))

# Example
if __name__ == '__main__':
records = [(1, 3.3, 7.5),
(5, 9.8, 11.0),
(16, 18.4, 66.7)]
with open('data.b', 'wb') as f:
write_records(records, '<idd', f)

三、读取二进制文件-字符串形式

from struct import Struct

def unpack_records(format, data):
record_struct = Struct(format)
return (record_struct.unpack_from(data, offset)
for offset in range(0, len(data), record_struct.size))


if __name__ == '__main__':
with open('test.b', 'rb') as f:
data = f.read()
for rec in unpack_records('<idd', data):
pass

四、结构体处理

from struct import Struct

record_struct = Struct('<idd')

from struct import Struct
record_struct = Struct('<idd')
print(f'record struct size: {record_struct.size}')
print(f'pack: {record_struct.pack(1, 3.0, 6.0)}')


import struct
print(f"struct pack: {struct.pack('<idd', 1, 3.0, 6.0)}")

五、迭代器处理

#固定块大小迭代器
f = open('test.b', 'rb')
chunks = iter(lambda: f.read(20), b'')
print(chunks)
for chk in chunks:
print(f'chk is: {chk}')

#上面的迭代器方法等同于下面的两个方法之和
def read_records(format, f):
record_struct = Struct(format)
while True:
chk = f.read(record_struct.size)
if chk == b'':
break
yield record_struct.unpack(chk)


def unpack_records(format, data):
record_struct = Struct(format)
return (record_struct.unpack(data[offset:offset + record_struct.size])
for offset in range(0, len(data), record_struct.size))

六、大量文件处理

from collections import namedtuple
#为元组命名
Record = namedtuple('Record', ['kind','x','y'])

with open('test.p', 'rb') as f:
record_list = (Record(*r) for r in read_records('<idd', f))

for r in record_list:
print(f'kind:{r.kind}, x: {r.x}, y: {r.y}')

#处理大量的二进制文件
import numpy as np
f = open('test.b', 'rb')
record_list = np.fromfile(f, dtype='<i,<d,<d')
print(f'records: {record_list}')
print(f'records 0: {record_list[0]}')
print(f'records 1: {record_list[1]}')

七、复杂文件处理(图片等)

test_list = [
[(1.0, 2.5), (3.5, 4.0), (2.5, 1.5)],
[(7.0, 1.2), (5.1, 3.0), (0.5, 7.5), (0.8, 9.0)],
[(3.4, 6.3), (1.2, 0.5), (4.6, 9.2)],
]

import struct
import itertools

def write_polys(file_name, polys):
# Determine bounding box
flattened = list(itertools.chain(*polys))
min_x = min(x for x, y in flattened)
max_x = max(x for x, y in flattened)
min_y = min(y for x, y in flattened)
max_y = max(y for x, y in flattened)
with open(file_name, 'wb') as f:
f.write(struct.pack('<iddddi', 0x1234,
min_x, min_y,
max_x, max_y,
len(polys)))
for poly in polys:
size = len(poly) * struct.calcsize('<dd')
f.write(struct.pack('<i', size + 4))
for pt in poly:
f.write(struct.pack('<dd', *pt))

if __name__ == '__main__':
write_polys('test.bin', test_list)

def read_polys(file_name):
with open(file_name, 'rb') as f:
# Read the header
header = f.read(40)
file_code, min_x, min_y, max_x, max_y, num_polys = \
struct.unpack('<iddddi', header)
polys = []
for n in range(num_polys):
pbytes, = struct.unpack('<i', f.read(4))
poly = []
for m in range(pbytes // 16):
pt = struct.unpack('<dd', f.read(16))
poly.append(pt)
polys.append(poly)
return polys

"""=======更好的实现========"""
import struct

class StructField:
"""
Descriptor representing a simple structure field
"""
def __init__(self, format, offset):
self.format = format
self.offset = offset
def __get__(self, instance, cls):
if instance is None:
return self
else:
r = struct.unpack_from(self.format, instance._buffer, self.offset)
return r[0] if len(r) == 1 else r

class Structure:
def __init__(self, byte_data):
self._buffer = memoryview(byte_data)


class PolyHeader(Structure):
file_code = StructField('<i', 0)
min_x = StructField('<d', 4)
min_y = StructField('<d', 12)
max_x = StructField('<d', 20)
max_y = StructField('<d', 28)
num_polys = StructField('<i', 36)


f = open('test.bin', 'rb')
phead = PolyHeader(f.read(40))
print(f'file code is: {phead.file_code == 0x1234}')
print(f'min x is: {phead.min_x}')
print(f'min y is: {phead.min_y}')
print(f'max x is: {phead.max_x}')
print(f'max y is: {phead.max_y}')
print(f'num polys is: {phead.num_polys}')

"""=======更更好的实现========"""
class StructureMeta(type):
"""
Metaclass that automatically creates StructField descriptors
"""
def __init__(self, cls_name, bases, cls_dict):
fields = getattr(self, '_fields_', [])
byte_order = ''
offset = 0
for format, field_name in fields:
if format.startswith(('<','>','!','@')):
byte_order = format[0]
format = format[1:]
format = byte_order + format
setattr(self, field_name, StructField(format, offset))
offset += struct.calcsize(format)
setattr(self, 'struct_size', offset)

class Structure(metaclass=StructureMeta):
def __init__(self, bytedata):
self._buffer = bytedata

@classmethod
def from_file(cls, f):
return cls(f.read(cls.struct_size))


class PolyHeader(Structure):
_fields_ = [
('<i', 'file_code'),
('d', 'min_x'),
('d', 'min_y'),
('d', 'max_x'),
('d', 'max_y'),
('i', 'num_polys')
]


f = open('test.bin', 'rb')
phead = PolyHeader.from_file(f)
print(f'file code is: {phead.file_code == 0x1234}')
print(f'min x is: {phead.min_x}')
print(f'min y is: {phead.min_y}')
print(f'max x is: {phead.max_x}')
print(f'max y is: {phead.max_y}')
print(f'num polys is: {phead.num_polys}')


class NestedStruct:
"""
Descriptor representing a nested structure
"""
def __init__(self, name, struct_type, offset):
self.name = name
self.struct_type = struct_type
self.offset = offset

def __get__(self, instance, cls):
if instance is None:
return self
else:
data = instance._buffer[self.offset:
self.offset+self.struct_type.struct_size]
result = self.struct_type(data)
setattr(instance, self.name, result)
return result

class StructureMeta(type):
"""
Metaclass that automatically creates StructField descriptors
"""
def __init__(self, cls_name, bases, cls_dict):
fields = getattr(self, '_fields_', [])
byte_order = ''
offset = 0
for format, field_name in fields:
if isinstance(format, StructureMeta):
setattr(self, field_name,
NestedStruct(field_name, format, offset))
offset += format.struct_size
else:
if format.startswith(('<','>','!','@')):
byte_order = format[0]
format = format[1:]
format = byte_order + format
setattr(self, field_name, StructField(format, offset))
offset += struct.calcsize(format)
setattr(self, 'struct_size', offset)


class Point(Structure):
_fields_ = [
('<d', 'x'),
('d', 'y')
]

class PolyHeader(Structure):
_fields_ = [
('<i', 'file_code'),
(Point, 'min'), # nested struct
(Point, 'max'), # nested struct
('i', 'num_polys')
]


f = open('test.bin', 'rb')
phead = PolyHeader.from_file(f)
print(f'file code is: {phead.file_code == 0x1234}')
print(f'min is: {phead.min}')
print(f'min x is: {phead.min_x}')
print(f'min y is: {phead.min_y}')
print(f'max x is: {phead.max_x}')
print(f'max y is: {phead.max_y}')
print(f'num polys is: {phead.num_polys}')


class SizedRecord:
def __init__(self, bytedata):
self._buffer = memoryview(bytedata)

@classmethod
def from_file(cls, f, size_fmt, includes_size=True):
sz_nbytes = struct.calcsize(size_fmt)
sz_bytes = f.read(sz_nbytes)
sz, = struct.unpack(size_fmt, sz_bytes)
buf = f.read(sz - includes_size * sz_nbytes)
return cls(buf)

def iter_as(self, code):
if isinstance(code, str):
s = struct.Struct(code)
for off in range(0, len(self._buffer), s.size):
yield s.unpack_from(self._buffer, off)
elif isinstance(code, StructureMeta):
size = code.struct_size
for off in range(0, len(self._buffer), size):
data = self._buffer[off:off+size]
yield code(data)


f = open('test.bin', 'rb')
phead = PolyHeader.from_file(f)
print(f'num polys is: {phead.num_polys}')
polydata = [ SizedRecord.from_file(f, '<i') for n in range(phead.num_polys) ]
print(f'poly data: {polydata}')


for n, poly in enumerate(polydata):
print(f'Polygon {n}')
for p in poly.iter_as('<dd'):
print(f'poly iter: {p}')

for n, poly in enumerate(polydata):
print(f'Polygon {n}')
for p in poly.iter_as(Point):
print(f'p.x = {p.x}, p.y = {p.y}')


class Point(Structure):
_fields_ = [
('<d', 'x'),
('d', 'y')
]

class PolyHeader(Structure):
_fields_ = [
('<i', 'file_code'),
(Point, 'min'),
(Point, 'max'),
('i', 'num_polys')
]

def read_polys(file_name):
polys = []
with open(file_name, 'rb') as f:
phead = PolyHeader.from_file(f)
for n in range(phead.num_polys):
rec = SizedRecord.from_file(f, '<i')
poly = [ (p.x, p.y) for p in rec.iter_as(Point) ]
polys.append(poly)
return polys


class ShapeFile(Structure):
_fields_ = [('>i', 'file_code'), # Big endian
('20s', 'unused'),
('i', 'file_length'),
('<i', 'version'), # Little endian
('i', 'shape_type'),
('d', 'min_x'),
('d', 'min_y'),
('d', 'max_x'),
('d', 'max_y'),
('d', 'min_z'),
('d', 'max_z'),
('d', 'min_m'),
('d', 'max_m')]


上一篇:Python从门到精通(六):线程-01-线程
下一篇:没有了
网友评论