qidao123.com ToB IT社区-企服评测·应用市场

 找回密码
 立即注册

角逐选题 python区块链实现 - proof of work工作量证明共识算法

[复制链接]
发表于 2026-2-10 20:11:09 | 显示全部楼层 |阅读模式
0 前言

🔥 优质角逐项目系列,本日要分享的是
python区块链实现 - proof of work工作量证明共识算法
该项目较为新奇,适相助为角逐课题方向,学长非常保举!
🧿 更多资料, 项目分享:
https://gitee.com/dancheng-senior/postgraduate

1 区块链底子

学长以比特币的结构向各人详解区块链的构成部门
1.1 比特币内部结构



  • previous hash(前一个区块的hash)
  • merkle root(默克尔树根节点,内部存储买卖业务数据)
  • timestamp(当前区块天生的时间)
  • nonce(旷工盘算hash值次数)

1.2 实现的区块链数据结构



  • index 当前第几个区块
  • timestamp 该区块创建时的时间戳
  • data 买卖业务信息
  • previousHash 前一个区块的hash
  • hash 当前区块的hash
1.3 注意点

第一个区块叫做创世区块(genesis block),区块链创建的时间默认生产的这里用的是单纯的链表,不是用默克尔树存储
示例代码

  1.     from hashlib import sha256
  2.     //区块schema
  3.     class Block:
  4.          
  5.         def __init__(self,index,timestamp,data,previousHash=""):
  6.             
  7.             self.index = index
  8.             self.timestamp = timestamp
  9.             self.data = data
  10.             self.previousHash = previousHash
  11.             self.hash = self.calculateHash()
  12.             
  13.         //计算当前区块的hash值
  14.         def calculateHash(self):
  15.             plainData = str(self.index)+str(self.timestamp)+str(self.data)
  16.             return sha256(plainData.encode('utf-8')).hexdigest()
  17.         
  18.         def __str__(self):
  19.             return str(self.__dict__)
  20.      //区块链schema
  21.     class BlockChain:
  22.         //初始化的时候 创建 创世区块
  23.         def __init__(self):
  24.             self.chain = [self.createGenesisBlock()]
  25.         //构建创世区块
  26.         def createGenesisBlock(self):
  27.             return Block(0,"01/01/2018","genesis block","0")
  28.         //获取最后一个区块
  29.         def getLatestBlock(self):
  30.             return self.chain[len(self.chain)-1]
  31.         //往区块链里面添加区块
  32.         def addBlock(self,newBlock):
  33.             newBlock.previousHash = self.getLatestBlock().hash
  34.             newBlock.hash = newBlock.calculateHash()
  35.             self.chain.append(newBlock)
  36.         
  37.         def __str__(self):
  38.             return str(self.__dict__)   
  39.         //校验区块链是不是有效的 有没有人被篡改
  40.         def chainIsValid(self):
  41.             for index in range(1,len(self.chain)):
  42.                 currentBlock = self.chain[index]
  43.                 previousBlock = self.chain[index-1]
  44.                 if (currentBlock.hash != currentBlock.calculateHash()):
  45.                     return False
  46.                 if previousBlock.hash != currentBlock.previousHash:
  47.                     return False
  48.             return True
  49. myCoin = BlockChain()
  50. myCoin.addBlock(Block(1,"02/01/2018","{amount:4}"))
  51. myCoin.addBlock(Block(2,"03/01/2018","{amount:5}"))
  52. #print block info 打印区块链信息
  53. print("print block info ####:")
  54. for block in myCoin.chain:
  55.     print(block)
  56. #check blockchain is valid 检查区块链是不是有效的
  57. print("before tamper block,blockchain is valid ###")
  58. print(myCoin.chainIsValid())
  59. #tamper the blockinfo  篡改区块2的数据
  60. myCoin.chain[1].data = "{amount:1002}"
  61. print("after tamper block,blockchain is valid ###")
  62. print(myCoin.chainIsValid())
复制代码
输出效果

  1. print block info ####:
  2. {'index': 0, 'timestamp': '01/01/2018', 'data': 'genesis block', 'previousHash': '0', 'hash': 'd8d21e5ba33780d5eb77d09d3b407ceb8ade4e5545ef951de1997b209d91e264'}
  3. {'index': 1, 'timestamp': '02/01/2018', 'data': '{amount:4}', 'previousHash': 'd8d21e5ba33780d5eb77d09d3b407ceb8ade4e5545ef951de1997b209d91e264', 'hash': '15426e32db30f4b26aa719ba5e573f372f41e27e4728eb9e9ab0bea8eae63a9d'}
  4. {'index': 2, 'timestamp': '03/01/2018', 'data': '{amount:5}', 'previousHash': '15426e32db30f4b26aa719ba5e573f372f41e27e4728eb9e9ab0bea8eae63a9d', 'hash': '75119e897f21c769acee6e32abcefc5e88e250a1f35cc95946379436050ac2f0'}
  5. before tamper block,blockchain is valid ###
  6. True
  7. after tamper block,blockchain is valid ###
  8. False
复制代码
1.4 区块链的焦点-工作量证明算法

上面学长先容了区块链的根本结构,我在之前的底子上来简朴实现一下工作量证明算法(proof of
work),在先容pow之前先思索一下为什么要工作量证明算法,大概再往前想一步为什么比特币怎样办理信托的题目?
1.4.1 拜占庭将军题目

比特币出现之前就有了拜占庭将军题目,重要头脑是,如安在分布式体系环境里去信赖其他人发给你的信息?
一组拜占庭将军分别各向导一支部队共同围困一座都会。为了简化题目,将各支部队的举措计谋限定为打击或撤离两种。由于部门部队打击部门部队撤离大概会造成灾难性结果,因此各位将军必须通过投票来告竣同等计谋,即全部部队一起打击或全部部队一起撤离。由于各位将军分处都会差别方向,他
体系的题目在于,将军中大概出现叛徒,他们不光大概向较为糟糕的计谋投票,还大概选择性地发送投票信息。假设有9位将军投票,此中1名叛徒。8名忠诚的将军中出现了4人投打击,4人投撤离的环境。这时间叛徒大概故意给4名投打击的将领送信表现投票打击,而给4名投撤离的将领送信表现投撤离。如许一来在4名投打击的将领看来,投票效果是5人投打击,从而发起打击;而在4名投撤离的将军看来则是5人投撤离。如许各支部队的同等协同就遭到了粉碎。

1.4.2 办理办法

拜占庭将军题目重要题目是,中央人可以拦截消息,举行修改;上述的那些士兵可以明确成比特币中的一些节点,不是全部节点拿到消息后都是可以直接处理惩罚的,先去办理一个数学题目,就是工作量证明,只有拥有特定的盘算本领办理了题目之后才气去修改大概校验(验证,打包,上链)。

上图就是简朴的工作量证明算法流程,一串数字反面有个x,x之前的数可以明确成买卖业务数据,然后必要找到一个x,让整个数的hash值的开头有n个0,如果hash是很匀称的话,那么天生的hash值每一位为0大概1都是等大概的,以是前n个都为0的概率就是2的n次方/2的hash值位数,上图给出了如果hash值是5个bit的环境下的全部大概
1.4.3 代码实现


  1.     from hashlib import sha256
  2.     import time
  3.     class Block:
  4.          
  5.         def __init__(self,index,timestamp,data,previousHash=""):
  6.             
  7.             self.index = index
  8.             self.timestamp = timestamp
  9.             self.data = data
  10.             self.previousHash = previousHash
  11.             self.nonce = 0 //代表当前计算了多少次hash计算
  12.             self.hash = self.calculateHash()
  13.         def calculateHash(self):
  14.             plainData = str(self.index)+str(self.timestamp)+str(self.data)+str(self.nonce)
  15.             return sha256(plainData.encode('utf-8')).hexdigest()
  16.         #挖矿 difficulty代表复杂度 表示前difficulty位都为0才算成功
  17.         def minerBlock(self,difficulty):
  18.             while(self.hash[0:difficulty]!=str(0).zfill(difficulty)):
  19.                 self.nonce+=1
  20.                 self.hash = self.calculateHash()
  21.         
  22.         def __str__(self):
  23.             return str(self.__dict__)
  24.     class BlockChain:
  25.         
  26.         def __init__(self):
  27.             self.chain = [self.createGenesisBlock()]
  28.             self.difficulty = 5
  29.    
  30.         def createGenesisBlock(self):
  31.             return Block(0,"01/01/2018","genesis block")
  32.         
  33.         def getLatestBlock(self):
  34.             return self.chain[len(self.chain)-1]
  35.         #添加区块前需要 做一道计算题😶,坐完后才能把区块加入到链上
  36.         def addBlock(self,newBlock):
  37.             newBlock.previousHash = self.getLatestBlock().hash
  38.             newBlock.minerBlock(self.difficulty)
  39.             self.chain.append(newBlock)
  40.         def __str__(self):
  41.             return str(self.__dict__)   
  42.         
  43.         def chainIsValid(self):
  44.             for index in range(1,len(self.chain)):
  45.                 currentBlock = self.chain[index]
  46.                 previousBlock = self.chain[index-1]
  47.                 if (currentBlock.hash != currentBlock.calculateHash()):
  48.                     return False
  49.                 if previousBlock.hash != currentBlock.previousHash:
  50.                     return False
  51.             return True
  52.            
  53.     myCoin = BlockChain()
  54.    
  55.     # 下面打印了每个区块挖掘需要的时间 比特币通过一定的机制控制在10分钟出一个块
  56.     # 其实就是根据当前网络算力 调整我们上面difficulty值的大小,如果你在
  57.     # 本地把上面代码difficulty的值调很大你可以看到很久都不会出计算结果
  58.     startMinerFirstBlockTime = time.time()
  59.     print("start to miner first block time :"+str(startMinerFirstBlockTime))
  60.    
  61.     myCoin.addBlock(Block(1,"02/01/2018","{amount:4}"))
  62.    
  63.     print("miner first block time completed" + ",used " +str(time.time()-startMinerFirstBlockTime) +"s")
  64.    
  65.     startMinerSecondBlockTime = time.time()
  66.    
  67.     print("start to miner first block time :"+str(startMinerSecondBlockTime))
  68.    
  69.     myCoin.addBlock(Block(2,"03/01/2018","{amount:5}"))
  70.    
  71.     print("miner second block time completed" + ",used " +str(time.time()-startMinerSecondBlockTime) +"s\n")
  72.    
  73.     #print block info
  74.     print("print block info ####:\n")
  75.     for block in myCoin.chain:
  76.         print("\n")
  77.         print(block)
  78.         
  79.     #check blockchain is valid
  80.     print("before tamper block,blockchain is valid ###")
  81.     print(myCoin.chainIsValid())
  82.    
  83.     #tamper the blockinfo
  84.     myCoin.chain[1].data = "{amount:1002}"
  85.     print("after tamper block,blockchain is valid ###")
  86.     print(myCoin.chainIsValid())
复制代码
输出

2 快速实现一个区块链

2.1 什么是区块链

区块链是一个不可变得,有序的被称之为块的纪录链,它们可以包罗买卖业务、文件大概任何你喜欢的数据,但最告急的是,它们用hash毗连在一起。
2.2 一个完备的快包罗什么

一个索引,一个时间戳,一个事物列表,一个校验, 一个前快的散链表
2.3 什么是挖矿

挖矿着实非常简朴就做了以下三件事:
1、盘算工作量证明poW
2、通过新增一个买卖业务赋予矿工(自已)一个币
3、构造新区块并将其添加到链中
2.4 工作量证明算法:

使用该算法来证明是如安在区块上创建和发掘新的区块,pow的目的是盘算出一个符合特定条件的数字,这个数字对于全部人而言必须在盘算上非常困难,但易于验证,这就是工作证明背后的焦颔首脑盘算难度与目的字符串必要满意的特定字符串成正比。
2.5 实当代码


  1.     import hashlib
  2.     import json
  3.     import requests
  4.     from textwrap import dedent
  5.     from time import time
  6.     from uuid import uuid4
  7.     from urllib.parse import urlparse
  8.     from flask import Flask, jsonify, request
  9.    
  10.     class Blockchain(object):
  11.         def __init__(self):
  12.             ...
  13.             self.nodes = set()
  14.             # 用 set 来储存节点,避免重复添加节点.
  15.             ...
  16.             self.chain = []
  17.             self.current_transactions = []
  18.    
  19.             #创建创世区块
  20.             self.new_block(previous_hash=1,proof=100)
  21.    
  22.         def reister_node(self,address):
  23.             """
  24.             在节点列表中添加一个新节点
  25.             :param address:
  26.             :return:
  27.             """
  28.             prsed_url = urlparse(address)
  29.             self.nodes.add(prsed_url.netloc)
  30.    
  31.         def valid_chain(self,chain):
  32.             """
  33.             确定一个给定的区块链是否有效
  34.             :param chain:
  35.             :return:
  36.             """
  37.             last_block = chain[0]
  38.             current_index = 1
  39.    
  40.             while current_index<len(chain):
  41.                 block = chain[current_index]
  42.                 print(f'{last_block}')
  43.                 print(f'{block}')
  44.                 print("\n______\n")
  45.                 # 检查block的散列是否正确
  46.                 if block['previous_hash'] != self.hash(last_block):
  47.                     return False
  48.                 # 检查工作证明是否正确
  49.                 if not self.valid_proof(last_block['proof'], block['proof']):
  50.                     return False
  51.    
  52.                 last_block = block
  53.                 current_index += 1
  54.             return True
  55.         def ressolve_conflicts(self):
  56.             """
  57.             共识算法
  58.             :return:
  59.             """
  60.             neighbours = self.nodes
  61.             new_chain = None
  62.             # 寻找最长链条
  63.             max_length = len(self.chain)
  64.    
  65.             # 获取并验证网络中的所有节点的链
  66.             for node in neighbours:
  67.                 response = requests.get(f'http://{node}/chain')
  68.    
  69.                 if response.status_code == 200:
  70.                     length = response.json()['length']
  71.                     chain = response.json()['chain']
  72.    
  73.                     # 检查长度是否长,链是否有效
  74.                     if length > max_length and self.valid_chain(chain):
  75.                         max_length = length
  76.                         new_chain = chain
  77.    
  78.             # 如果发现一个新的有效链比当前的长,就替换当前的链
  79.             if new_chain:
  80.                 self.chain = new_chain
  81.                 return True
  82.             return False
  83.    
  84.         def new_block(self,proof,previous_hash=None):
  85.             """
  86.             创建一个新的块并将其添加到链中
  87.             :param proof: 由工作证明算法生成证明
  88.             :param previous_hash: 前一个区块的hash值
  89.             :return: 新区块
  90.             """
  91.             block = {
  92.                 'index':len(self.chain)+1,
  93.                 'timestamp':time(),
  94.                 'transactions':self.current_transactions,
  95.                 'proof':proof,
  96.                 'previous_hash':previous_hash or self.hash(self.chain[-1]),
  97.             }
  98.    
  99.             # 重置当前交易记录
  100.             self.current_transactions = []
  101.    
  102.             self.chain.append(block)
  103.             return block
  104.    
  105.         def new_transaction(self,sender,recipient,amount):
  106.             # 将新事务添加到事务列表中
  107.             """
  108.             Creates a new transaction to go into the next mined Block
  109.             :param sender:发送方的地址
  110.             :param recipient:收信人地址
  111.             :param amount:数量
  112.             :return:保存该事务的块的索引
  113.             """
  114.             self.current_transactions.append({
  115.                 'sender':sender,
  116.                 'recipient':recipient,
  117.                 'amount':amount,
  118.             })
  119.    
  120.             return  self.last_block['index'] + 1
  121.         @staticmethod
  122.         def hash(block):
  123.             """
  124.             给一个区块生成 SHA-256 值
  125.             :param block:
  126.             :return:
  127.             """
  128.             # 必须确保这个字典(区块)是经过排序的,否则将会得到不一致的散列
  129.             block_string = json.dumps(block,sort_keys=True).encode()
  130.             return hashlib.sha256(block_string).hexdigest()
  131.    
  132.         @property
  133.         def last_block(self):
  134.             # 返回链中的最后一个块
  135.             return self.chain[-1]
  136.         def proof_of_work(self,last_proof):
  137.             # 工作算法的简单证明
  138.             proof = 0
  139.             while self.valid_proof(last_proof,proof)is False:
  140.                 proof +=1
  141.             return proof
  142.    
  143.         @staticmethod
  144.         def valid_proof(last_proof,proof):
  145.             # 验证证明
  146.             guess =  f'{last_proof}{proof}'.encode()
  147.             guess_hash = hashlib.sha256(guess).hexdigest()
  148.             return guess_hash[:4] =="0000"
  149.     # 实例化节点
  150.     app = Flask(__name__)
  151.    
  152.     # 为该节点生成一个全局惟一的地址
  153.     node_identifier = str(uuid4()).replace('-','')
  154.    
  155.     # 实例化Blockchain类
  156.     blockchain = Blockchain()
  157.    
  158.     # 进行挖矿请求
  159.     @app.route('/mine',methods=['GET'])
  160.     def mine():
  161.         # 运行工作算法的证明来获得下一个证明。
  162.         last_block = blockchain.last_block
  163.         last_proof = last_block['proof']
  164.         proof = blockchain.proof_of_work(last_proof)
  165.    
  166.         # 必须得到一份寻找证据的奖赏。
  167.         blockchain.new_transaction(
  168.             sender="0",
  169.             recipient=node_identifier,
  170.             amount=1,
  171.         )
  172.    
  173.         # 通过将其添加到链中来构建新的块
  174.         previous_hash = blockchain.hash(last_block)
  175.         block = blockchain.new_block(proof,previous_hash)
  176.         response = {
  177.             'message': "New Block Forged",
  178.             'index': block['index'],
  179.             'transactions': block['transactions'],
  180.             'proof': block['proof'],
  181.             'previous_hash': block['previous_hash'],
  182.         }
  183.         return jsonify(response), 200
  184.    
  185.     # 创建交易请求
  186.     @app.route('/transactions/new',methods=['POST'])
  187.     def new_transactions():
  188.         values = request.get_json()
  189.    
  190.         # 检查所需要的字段是否位于POST的data中
  191.         required = ['seder','recipient','amount']
  192.         if not all(k in values for k in request):
  193.             return 'Missing values',400
  194.    
  195.         #创建一个新的事物
  196.         index = blockchain.new_transaction(values['sender'], values['recipient'], values['amount'])
  197.         response = {'message': f'Transaction will be added to Block {index}'}
  198.         return jsonify(response), 201
  199.    
  200.     # 获取所有快信息
  201.     @app.route('/chain',methods=['GET'])
  202.     def full_chain():
  203.         response = {
  204.             'chain':blockchain.chain,
  205.             'length':len(blockchain.chain),
  206.         }
  207.         return jsonify(response),200
  208.    
  209.     # 添加节点
  210.     @app.route('/nodes/register',methods=['POST'])
  211.     def  register_nodes():
  212.         values = request.get_json()
  213.         nodes = values.get('nodes')
  214.         if nodes is None:
  215.             return "Error: Please supply a valid list of nodes", 400
  216.    
  217.         for node in nodes:
  218.             blockchain.register_node(node)
  219.    
  220.         response = {
  221.             'message': 'New nodes have been added',
  222.             'total_nodes': list(blockchain.nodes),
  223.         }
  224.         return jsonify(response), 201
  225.    
  226.     # 解决冲突
  227.     @app.route('/nodes/resolve', methods=['GET'])
  228.     def consensus():
  229.         replaced = blockchain.resolve_conflicts()
  230.    
  231.         if replaced:
  232.             response = {
  233.                 'message': 'Our chain was replaced',
  234.                 'new_chain': blockchain.chain
  235.             }
  236.         else:
  237.             response = {
  238.                 'message': 'Our chain is authoritative',
  239.                 'chain': blockchain.chain
  240.             }
  241.    
  242.         return jsonify(response), 200
  243.    
  244.     if __name__ == '__main__':
  245.         app.run(host='0.0.0.0',port=5000)
复制代码
代码弄好启动你的项目以后打开Postman 完成以下操纵

学长通过哀求 http://localhost:5000/mine举行采矿

3 末了

🧿 更多资料, 项目分享:
https://gitee.com/dancheng-senior/postgraduate

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!qidao123.com:ToB企服之家,中国第一个企服评测及软件市场,开放入驻,技术点评得现金

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

×
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

登录后关闭弹窗

登录参与点评抽奖  加入IT实名职场社区
去登录

QQ|手机版|qidao123.com IT社区;IT企服评测▪应用市场 ( 浙ICP备20004199|浙ICP备20004199号 )|网站地图

GMT+8, 2026-4-17 08:12 , Processed in 0.200992 second(s), 32 queries .

Powered by Discuz! X3.5 Licensed

© 2001-2026 Discuz! Team.

快速回复 返回顶部 返回列表