SteemData Notify 代码学习一: Blockchain Worker / Code Study of SteemData Notify: Part one

话说学习编码最好的方式就是读优秀的代码和写代码。
尤其是读优秀的代码,既然自己写的代码很垃圾,多读读人家大牛们的优秀作品,受一下熏陶,沾惹点仙灵之气也好。俗话说:读书破万卷,下笔如有神!,俗话还说:熟读唐诗三百首,不会作诗也会吟!,那我把大牛们的代码读几遍,是不是也会写出牛光闪闪的代码呢?

又扯远了,言归正传,昨天给大家介绍了一下SteemData Notify,觉得是个挺有意思的产品:

今天呢,就来学习一下它的代码,看看具体是如何实现相关功能的。

代码功能模块

代码托管在github上,地址是: https://github.com/SteemData/notify.steemdata.com

通过简单分析,可以知晓代码分为应用(网站)端和后端

  • 应用(网站)端 src/app.py
  • 后端 src/worker.py

应用端: 使用Python + Flask + MongoDB
后端则主要使用: Python + Steem官方Python库 + MongoDB

我们这节着重分析后端的代码

后端代码

源码在这里
https://github.com/SteemData/notify.steemdata.com/blob/master/src/worker.py

通过阅读main函数
我们可以知道代码分成三大逻辑块

  • blockchain worker
  • confirmation worker
  • notifier worker

顾名思义,分别区块链工作进程、确认进程、通知进程, 分别通过不同的命令行参数启动。
至于为何不使用线程?以我实际经验,官方的Python 库对线程不是特别友好,尤其是一些复杂情况,可能导致很多意想不到的问题,所以多跑俩进程貌似也没啥不好的。或许作者有别的方面的思量,就不得而知了。

这篇文章我们主要学习三者之一的Blockchain Worker

Blockchain Worker

函数名: run_blockchain_worker()

    b = Blockchain()
    types = [
        'account_update',
        'change_recovery_account',
        'request_account_recovery',
        'transfer',
        'transfer_from_savings',
        'set_withdraw_vesting_route',
        'withdraw_vesting',
        'fill_order',
        'fill_convert_request',
        'fill_transfer_from_savings',
        'fill_vesting_withdraw',
]

b = Blockchain() 定义了Blockchain 实例
其本质呢,就是封装了一些对steem 节点的API操作
其中一个基本的操作是:
stream(self, filter_by: Union[str, list] = list(), *args, **kwargs)
简单的解释就是把steem区块链上发生的操作yield成操作流,然后可以针对不同的操作类型进行处理,比如说投票机器人就是Yield出来文章流,然后投票。

types = [ xxxx]
这一大堆,就是上述函数中的filter_by参数,用来过滤抽取我们需要的相关操作,忽略掉无关内容

有了上述对Blockchain以及filter_by的讲解,下面的代码就很好理解了

    try:
        block = db.last_processed_block.find_one()
        start_block = int(block['block_num']) - 1
    except Exception:
        start_block = None
    for op in b.stream(filter_by=types, start_block=start_block):
        processed = db.processed_blockchains.find({'_id': op['_id']}).count()
        if not processed:
            if parse_blockchain(op):
                db.processed_blockchains.insert_one(op)
                db.last_processed_block.delete_many({})
                db.last_processed_block.insert_one(op)

大致就是,把我们关心的操作读取回来,并且写到数据库中。

processed = db.processed_blockchains.find({'_id': op['_id']}).count()
判断我们是否已经处理过了,如果已经处理过,略过。

parse_blockchain(op)
这个判断是否是我们关心的数据,稍后详细讲

db.processed_blockchains.insert_one(op)
将op 插入我们处理过的op列表(mongodb)

db.last_processed_block.delete_many({})
db.last_processed_block.insert_one(op)
其实就是保存一下我们处理到哪里了,这样一旦因故障等问题中断,我们还可以接续上。

parse_blockchain(op)

前文我们说过,这个来获取我们关心的操作,这个是如何实现的呢?

def parse_blockchain(op):
    settings = None
    message = None

    if op['type'] == 'account_update':
        settings = find_user_settings(op['account'])
        if settings and settings['account_update']:
            message = 'Received event: account_update (%s)' % op['account']

    elif op['type'] in ['transfer', 'transfer_from_savings']:
        settings = find_user_settings(op['from'])
        if settings and settings[op['type']]:
            message = 'Received event: %s\nEvent detail: %s -> %s (%s)' % (
                op['type'], op['from'], op['to'], op['amount'],
            )
....

    if settings and message:
        db.notifications.insert_one({
            'username': settings['username'],
            'email': settings['email'],
            'telegram_channel_id': settings['telegram_channel_id'],
            'message': message,
            'email_sent': False,
            'telegram_sent': False,
            'created_at': datetime.utcnow(),
        })

    return True

嗯,我好像发现了一个了不起的BUG,一会再说。
这里又出来一个函数

def find_user_settings(username):
    try:
        rows = db.settings.find({'username': username, 'confirmed': True}).sort('created_at', -1)
        return rows[0]
    except Exception:
        return dict()

这个函数的功能,就是按用户名,读取用户(已确认)的设置并返回。

    if op['type'] == 'account_update':
        settings = find_user_settings(op['account'])
        if settings and settings['account_update']:
            message = 'Received event: account_update (%s)' % op['account']

这样,这段代码就好理解了,按白话文翻译就是
如果操作是更新账户,
那么我就去设置表读取这个用户的设置
如果存在设置表中有这个用户的设置 并且 这个用户设置了关心 'account_update'
那么通知信息就是吧啦啦啦
是不是很好理解

因为操作只能是这些操作中的一种,所以要逐一判断一下,直到发现或者判断完毕没有发现为止。

    if settings and message:
        db.notifications.insert_one({
            'username': settings['username'],
            'email': settings['email'],
            'telegram_channel_id': settings['telegram_channel_id'],
            'message': message,
            'email_sent': False,
            'telegram_sent': False,
            'created_at': datetime.utcnow(),
        })

    return True

如果发现了需要处理的操作,将其插入notifications 表。

BUG,BUG
注意这个 return True, 注意缩进,OMG,少缩进了一个TAB有木有?
这样,原本只有我们处理到的op 才应该返回True, 结果统统返回True了。

你问我后果是啥?貌似没啥后果,浪费一些数据库空间和CPU运算能力而已。

总结

  • Blockchain Worker 将blockchain上和账户有关的动态抓取进来
  • 判断是否是SteemData Notify 注册(并确认)用户相关的操作以及是否是用户关心的操作
  • 如果是,写入数据库通知表notifications

咦,一总结好像也没啥内容呢,那我为啥写了这么多呢?晕,这啰嗦的毛病要改一改了。
初学者水平有限,如有谬误敬请指正,深表谢意啦。


感谢阅读 / Thank you for reading.
欢迎upvote、resteem以及 following me @oflyhigh 😎

H2
H3
H4
3 columns
2 columns
1 column
36 Comments