新聞中心
golang-redis系列——返回值助手函數(shù)(二)
從上一節(jié)的內(nèi)容可知,Do() 和 Receive() 等方法的返回值,除了 error 外,是一個 interface{} 類型的返回值,因此當我們的復(fù)雜操作返回的不是基本數(shù)據(jù)類型時,就需要我們自己解析返回值,例如,當我們利用 HMGET 方法獲取一批返回值時,就需要對返回結(jié)果進行解析,具體如下:
創(chuàng)新互聯(lián)-專業(yè)網(wǎng)站定制、快速模板網(wǎng)站建設(shè)、高性價比東平網(wǎng)站開發(fā)、企業(yè)建站全套包干低至880元,成熟完善的模板庫,直接使用。一站式東平網(wǎng)站制作公司更省心,省錢,快速模板網(wǎng)站建設(shè)找我們,業(yè)務(wù)覆蓋東平地區(qū)。費用合理售后完善,10年實體公司更值得信賴。
由于返回值是多條數(shù)據(jù),因此需要先將 reply 轉(zhuǎn)成 []interface 類型,然后在遍歷結(jié)果時在分別轉(zhuǎn)成 []uint8 (byte數(shù)組), 最后再轉(zhuǎn)成 string 類型。
隨著我們操作復(fù)雜度,數(shù)據(jù)解析的工作量也會非常大,(lua 腳本的使用,會使結(jié)果的解析更為復(fù)雜,因為可能存在多種類型的結(jié)果一起返回的情況,lua 腳本相關(guān)的內(nèi)容會在下一節(jié)介紹)。
redigo 包中的返回值助手函數(shù)的存在,就是為了幫助我們完成這些枯燥繁瑣的數(shù)據(jù)解析過程。
返回值助手函數(shù)相關(guān)源碼路徑為 github.com/gomodule/redigo/redis/reply.go 提供的主要方法如下:
上述返回值助手函數(shù)的具體使用,應(yīng)該依據(jù)具體的命令進行選擇。如果大家還記得上一節(jié)介紹的 Redis 基本數(shù)據(jù)類型,可能會有些疑問,對于 redis 來說,其數(shù)據(jù)據(jù)存儲本質(zhì)都是 []bytes, 為什么可以解析出 Int、int64、float等類型的數(shù)據(jù)呢?
我們以 Float64() 為例進行說明,具體源碼如下:
其實,返回值助手函數(shù)是將 []byte 類型的原始數(shù)據(jù),利用 strconv.ParseFloat(string(reply), 64) 轉(zhuǎn)換成了 float64類型,因此在我們使用過程中返回值助手函數(shù)的選擇,應(yīng)該基于業(yè)務(wù)和實際存儲的數(shù)據(jù)格式為依據(jù)。我們以第一小節(jié)的示例為例,看返回值助手函數(shù)如何降低我們的工作量,具體如下:
除了使用返回值助手函數(shù)對上述固定結(jié)構(gòu)的結(jié)果進行解析外,redigo 包還提供了一個 Scan()函數(shù)用于解析自定義的復(fù)雜數(shù)據(jù)結(jié)構(gòu),我們依然以上一個示例進行說明,具體示例如下:
如果返回結(jié)果為結(jié)構(gòu)化切片,也可以使用 canSlice() 方法,從而簡化 loop 處理的部分,具體示例如下:
通過上述的示例,我們介紹了 scan 函數(shù)的基本用法,但是細心的同學(xué)可能會發(fā)現(xiàn)嗎,為什么數(shù)據(jù)寫入時,value 的類型為 []int64 但是讀取時只能按照 string 類型讀取呢。這是因為 Redis 底層存儲的數(shù)據(jù)本質(zhì)都是 string 類型,。 無論是 HMSET 還是 MSET 最終都只能按照 string 類型讀取,因為其本質(zhì)都是 hash 結(jié)構(gòu),不同之處僅在于 HMSET 是嵌套的 hash類型。 因此,[]int64 數(shù)據(jù)在寫入階段,就已經(jīng)被自動處理為 []byte,寫入 redis 之后,len 和 類型 屬性會丟失。
如果強行按照 []int64解析將出錯:
如果 value 必須以結(jié)構(gòu)化的數(shù)據(jù)存儲,那么可以提前對要寫入的數(shù)據(jù)進行編碼,例如 json、protobuf 等,取出后再進行解碼獲得原始數(shù)據(jù)。
詳解golang中bufio包的實現(xiàn)原理
最近用golang寫了一個處理文件的腳本,由于其中涉及到了文件讀寫,開始使用golang中的 io 包,后來發(fā)現(xiàn)golang 中提供了一個bufio的包,使用這個包可以大幅提高文件讀寫的效率,于是在網(wǎng)上搜索同樣的文件讀寫為什么bufio 要比io的讀寫更快速呢?根據(jù)網(wǎng)上的資料和閱讀源碼,以下來詳細解釋下bufio的高效如何實現(xiàn)的。
bufio 包介紹?
bufio包實現(xiàn)了有緩沖的I/O。它包裝一個io.Reader或io.Writer接口對象,創(chuàng)建另一個也實現(xiàn)了該接口,且同時還提供了緩沖和一些文本I/O的幫助函數(shù)的對象。
以上為官方包的介紹,在其中我們能了解到的信息如下:
bufio 是通過緩沖來提高效率
簡單的說就是,把文件讀取進緩沖(內(nèi)存)之后再讀取的時候就可以避免文件系統(tǒng)的io 從而提高速度。同理,在進行寫操作時,先把文件寫入緩沖(內(nèi)存),然后由緩沖寫入文件系統(tǒng)。看完以上解釋有人可能會表示困惑了,直接把 內(nèi)容-文件 和 內(nèi)容-緩沖-文件相比, 緩沖區(qū)好像沒有起到作用嘛。其實緩沖區(qū)的設(shè)計是為了存儲多次的寫入,最后一口氣把緩沖區(qū)內(nèi)容寫入文件。下面會詳細解釋
bufio 封裝了io.Reader或io.Writer接口對象,并創(chuàng)建另一個也實現(xiàn)了該接口的對象
io.Reader或io.Writer 接口實現(xiàn)read() 和 write() 方法,對于實現(xiàn)這個接口的對象都是可以使用這兩個方法的
bufio 包實現(xiàn)原理
bufio 源碼分析
Reader對象
bufio.Reader 是bufio中對io.Reader 的封裝
// Reader implements buffering for an io.Reader object.
type Reader struct {
buf???? []byte
rd????? io.Reader // reader provided by the client
r, w???? int??? // buf read and write positions
err???? error
lastByte?? int
lastRuneSize int
}
bufio.Read(p []byte) 相當于讀取大小len(p)的內(nèi)容,思路如下:
當緩存區(qū)有內(nèi)容的時,將緩存區(qū)內(nèi)容全部填入p并清空緩存區(qū)
當緩存區(qū)沒有內(nèi)容的時候且len(p)len(buf),即要讀取的內(nèi)容比緩存區(qū)還要大,直接去文件讀取即可
當緩存區(qū)沒有內(nèi)容的時候且len(p)len(buf),即要讀取的內(nèi)容比緩存區(qū)小,緩存區(qū)從文件讀取內(nèi)容充滿緩存區(qū),并將p填滿(此時緩存區(qū)有剩余內(nèi)容)
以后再次讀取時緩存區(qū)有內(nèi)容,將緩存區(qū)內(nèi)容全部填入p并清空緩存區(qū)(此時和情況1一樣)
以下是源碼
// Read reads data into p.
// It returns the number of bytes read into p.
// The bytes are taken from at most one Read on the underlying Reader,
// hence n may be less than len(p).
// At EOF, the count will be zero and err will be io.EOF.
func (b *Reader) Read(p []byte) (n int, err error) {
n = len(p)
if n == 0 {
return 0, b.readErr()
}
if b.r == b.w {
if b.err != nil {
return 0, b.readErr()
}
if len(p) = len(b.buf) {
// Large read, empty buffer.
// Read directly into p to avoid copy.
n, b.err = b.rd.Read(p)
if n 0 {
panic(errNegativeRead)
}
if n 0 {
b.lastByte = int(p[n-1])
b.lastRuneSize = -1
}
return n, b.readErr()
}
// One read.
// Do not use b.fill, which will loop.
b.r = 0
b.w = 0
n, b.err = b.rd.Read(b.buf)
if n 0 {
panic(errNegativeRead)
}
if n == 0 {
return 0, b.readErr()
}
b.w += n
}
// copy as much as we can
n = copy(p, b.buf[b.r:b.w])
b.r += n
b.lastByte = int(b.buf[b.r-1])
b.lastRuneSize = -1
return n, nil
}
說明:
reader內(nèi)部通過維護一個r, w 即讀入和寫入的位置索引來判斷是否緩存區(qū)內(nèi)容被全部讀出
Writer對象
bufio.Writer 是bufio中對io.Writer 的封裝
// Writer implements buffering for an io.Writer object.
type Writer struct {
err error
buf []byte
n? int
wr io.Writer
}
bufio.Write(p []byte) 的思路如下
判斷buf中可用容量是否可以放下 p
如果能放下,直接把p拼接到buf后面,即把內(nèi)容放到緩沖區(qū)
如果緩沖區(qū)的可用容量不足以放下,且此時緩沖區(qū)是空的,直接把p寫入文件即可
如果緩沖區(qū)的可用容量不足以放下,且此時緩沖區(qū)有內(nèi)容,則用p把緩沖區(qū)填滿,把緩沖區(qū)所有內(nèi)容寫入文件,并清空緩沖區(qū)
判斷p的剩余內(nèi)容大小能否放到緩沖區(qū),如果能放下(此時和步驟1情況一樣)則把內(nèi)容放到緩沖區(qū)
如果p的剩余內(nèi)容依舊大于緩沖區(qū),(注意此時緩沖區(qū)是空的,情況和步驟2一樣)則把p的剩余內(nèi)容直接寫入文件
// Write writes the contents of p into the buffer.
// It returns the number of bytes written.
// If nn len(p), it also returns an error explaining
// why the write is short.
func (b *Writer) Write(p []byte) (nn int, err error) {
for len(p) b.Available() b.err == nil {
var n int
if b.Buffered() == 0 {
// Large write, empty buffer.
// Write directly from p to avoid copy.
n, b.err = b.wr.Write(p)
} else {
n = copy(b.buf[b.n:], p)
b.n += n
b.flush()
}
nn += n
p = p[n:]
}
if b.err != nil {
return nn, b.err
}
n := copy(b.buf[b.n:], p)
b.n += n
nn += n
return nn, nil
}
說明:
b.wr 存儲的是一個io.writer對象,實現(xiàn)了Write()的接口,所以可以使用b.wr.Write(p) 將p的內(nèi)容寫入文件
b.flush() 會將緩存區(qū)內(nèi)容寫入文件,當所有寫入完成后,因為緩存區(qū)會存儲內(nèi)容,所以需要手動flush()到文件
b.Available() 為buf可用容量,等于len(buf) - n
下圖解釋的是其中一種情況,即緩存區(qū)有內(nèi)容,剩余p大于緩存區(qū)
golang p2p網(wǎng)
繼續(xù)進入下一個初始化
n.netService, err = nebnet.NewNebService(n)
if err != nil {
logging.CLog().WithFields(logrus.Fields{
"err": err,
}).Fatal("Failed to setup net service.")
}
netservice有兩個成員
type NebServicestruct {
node? ? ? *Node
dispatcher *Dispatcher
}
跳出stup()函數(shù)
先進入start()函數(shù)看一看
if err := n.netService.Start(); err != nil {
logging.CLog().WithFields(logrus.Fields{
"err": err,
}).Fatal("Failed to start net service.")
}
進入netservice.start()
func (ns *NebService) Start() error {
logging.CLog().Info("Starting NebService...")
// start dispatcher.
ns.dispatcher.Start()
// start node.
if err := ns.node.Start(); err != nil {
ns.dispatcher.Stop()
logging.CLog().WithFields(logrus.Fields{
"err": err,
}).Error("Failed to start NebService.")
return err
}
logging.CLog().Info("Started NebService.")
return nil
}
可以看到第一個start()的函數(shù)是dispatcher.start()
進入dispatch.start()
func (dp *Dispatcher) Start() {
logging.CLog().Info("Starting NebService Dispatcher...")
go dp.loop()
}
然后就出現(xiàn)一個新的線程、goruntime
go dp.loop()
進入該線程,看它干了些什么
timerChan := time.NewTicker(time.Second).C
for {
select {
case -timerChan:
metricsDispatcherCached.Update(int64(len(dp.receivedMessageCh)))
case -dp.quitCh:
logging.CLog().Info("Stoped NebService Dispatcher.")
return
case msg := -dp.receivedMessageCh:
msgType := msg.MessageType()
v, _ := dp.subscribersMap.Load(msgType)
if v == nil {
continue
? }
m, _ := v.(*sync.Map)
m.Range(func(key, valueinterface{}) bool {
select {
case key.(*Subscriber).msgChan - msg:
default:
logging.VLog().WithFields(logrus.Fields{
"msgType": msgType,
}).Warn("timeout to dispatch message.")
}
return true
? })
}
}
一個有點長的循環(huán)
metricsDispatcherCached.Update(int64(len(dp.receivedMessageCh)))一秒鐘刷新一次緩沖區(qū)
case msg := -dp.receivedMessageCh:
msgType := msg.MessageType()如果能取出dp.receivedMessageCh
msgType := msg.MessageType()首先判斷取出的信息類型
v, _ := dp.subscribersMap.Load(msgType)
if v == nil {
continue
}
根據(jù)類型取出相應(yīng)的map
如果取不出,那么使用continue結(jié)束這個case
m, _ := v.(*sync.Map)
斷言
m.Range(func(key, valueinterface{}) bool {
select {
case key.(*Subscriber).msgChan - msg:
default:
logging.VLog().WithFields(logrus.Fields{
"msgType": msgType,
}).Warn("timeout to dispa+tch message.")
}
return true
})
將msg推入其他管道里面去。其他goruntime會循環(huán)等待該
C語言goto語句問題:goto loop與goto repeat有什么區(qū)別嗎?
沒有區(qū)別goto
是跳轉(zhuǎn)語句,后面是標識符,自己命名的.沒什么特別含義,他這樣寫是為了區(qū)分,和讓程序更加可讀(這里可讀是指標識符相對于無意義的字符,并不是說goto語句會是程序可讀性更好,實際剛好相反).goto
loop從語義上理解應(yīng)該是跳轉(zhuǎn)到某一個循環(huán)里面讓人知道跳到那個地方去(即前面所謂的可讀好)
goto格式是
asd:xxxxxx
xxxxxxxx
xxxxxxx
goto
asd;
就是跳到asd重新在運行一次的意思,如果上面xxx中沒有結(jié)束或者跳出的語句上面就會變成死循環(huán).和
while(1)一個意思
一般程序中很少或根本不使用goto語句的,用了會很大的破壞可讀性,和帶來維護上的困難,所以不推薦使用
C語言loop: 這個什么意思?goto loop;
這個例子來說吧:loop: A語句;...(其他語句)goto loop;則goto loop表示程序轉(zhuǎn)去執(zhí)行l(wèi)oop標記的那個語句!
協(xié)程與異步IO
協(xié)程,又稱微線程,纖程。英文名 Coroutine 。Python對協(xié)程的支持是通過 generator 實現(xiàn)的。在generator中,我們不但可以通過for循環(huán)來迭代,還可以不斷調(diào)用 next()函數(shù) 獲取由 yield 語句返回的下一個值。但是Python的yield不但可以返回一個值,它還可以接收調(diào)用者發(fā)出的參數(shù)。yield其實是終端當前的函數(shù),返回給調(diào)用方。python3中使用yield來實現(xiàn)range,節(jié)省內(nèi)存,提高性能,懶加載的模式。
asyncio是Python 3.4 版本引入的 標準庫 ,直接內(nèi)置了對異步IO的支持。
從Python 3.5 開始引入了新的語法 async 和 await ,用來簡化yield的語法:
import asyncio
import threading
async def compute(x, y):
print("Compute %s + %s ..." % (x, y))
print(threading.current_thread().name)
await asyncio.sleep(x + y)
return x + y
async def print_sum(x, y):
result = await compute(x, y)
print("%s + %s = %s" % (x, y, result))
print(threading.current_thread().name)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
tasks = [print_sum(1, 2), print_sum(3, 4)]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
線程是內(nèi)核進行搶占式的調(diào)度的,這樣就確保了每個線程都有執(zhí)行的機會。而 coroutine 運行在同一個線程中,由語言的運行時中的 EventLoop(事件循環(huán)) 來進行調(diào)度。和大多數(shù)語言一樣,在 Python 中,協(xié)程的調(diào)度是非搶占式的,也就是說一個協(xié)程必須主動讓出執(zhí)行機會,其他協(xié)程才有機會運行。
讓出執(zhí)行的關(guān)鍵字就是 await。也就是說一個協(xié)程如果阻塞了,持續(xù)不讓出 CPU,那么整個線程就卡住了,沒有任何并發(fā)。
PS: 作為服務(wù)端,event loop最核心的就是IO多路復(fù)用技術(shù),所有來自客戶端的請求都由IO多路復(fù)用函數(shù)來處理;作為客戶端,event loop的核心在于利用Future對象延遲執(zhí)行,并使用send函數(shù)激發(fā)協(xié)程,掛起,等待服務(wù)端處理完成返回后再調(diào)用CallBack函數(shù)繼續(xù)下面的流程
Go語言的協(xié)程是 語言本身特性 ,erlang和golang都是采用了CSP(Communicating Sequential Processes)模式(Python中的協(xié)程是eventloop模型),但是erlang是基于進程的消息通信,go是基于goroutine和channel的通信。
Python和Go都引入了消息調(diào)度系統(tǒng)模型,來避免鎖的影響和進程/線程開銷大的問題。
協(xié)程從本質(zhì)上來說是一種用戶態(tài)的線程,不需要系統(tǒng)來執(zhí)行搶占式調(diào)度,而是在語言層面實現(xiàn)線程的調(diào)度 。因為協(xié)程 不再使用共享內(nèi)存/數(shù)據(jù) ,而是使用 通信 來共享內(nèi)存/鎖,因為在一個超級大系統(tǒng)里具有無數(shù)的鎖,共享變量等等會使得整個系統(tǒng)變得無比的臃腫,而通過消息機制來交流,可以使得每個并發(fā)的單元都成為一個獨立的個體,擁有自己的變量,單元之間變量并不共享,對于單元的輸入輸出只有消息。開發(fā)者只需要關(guān)心在一個并發(fā)單元的輸入與輸出的影響,而不需要再考慮類似于修改共享內(nèi)存/數(shù)據(jù)對其它程序的影響。
文章名稱:go語言的loop,go語言的特性
分享鏈接:http://ef60e0e.cn/article/hodjhd.html