Compare commits

...

5 Commits

17 changed files with 547 additions and 344 deletions

@ -1,7 +1,7 @@
CWD = $(shell pwd) CWD = $(shell pwd)
define HELP_TEXT define HELP_TEXT
Welcome to obkit! Welcome to okit!
Targets: Targets:
help provides help text help provides help text

@ -2,6 +2,9 @@
Short for "observability kit", `okit` aims to provide an all-in-one solution to application observability. Short for "observability kit", `okit` aims to provide an all-in-one solution to application observability.
_**DISCLAIMER!** This is an early preview. There are significant portions of this repository that still need to be
implemented. Check out the [issues](https://github.com/mjpitz/okit/issues) to see how you can help._
## Why ## Why
Traditional approaches to observability treat logging, application metrics, and tracing as independent operations, with Traditional approaches to observability treat logging, application metrics, and tracing as independent operations, with
@ -55,9 +58,8 @@ func main() {
okit.Emit("user_signup", tags...) okit.Emit("user_signup", tags...)
// Tracing // Tracing
ctx, done := okit.Trace(context.Background(), tags...) ctx := context.Background()
defer done() defer okit.Trace(&ctx, tags...).Done()
_ = ctx.Err() // not needed, removes unused error
// Logging // Logging
okit.Debug("a message used for debugging", tags...) okit.Debug("a message used for debugging", tags...)

104
api.go

@ -22,9 +22,61 @@ package okit
import ( import (
"context" "context"
"strings" "fmt"
) )
const (
// LevelTrace is used to enable bookend events for tracing.
LevelTrace Level = iota
// LevelDebug enables debug level logging for the application.
LevelDebug
// LevelInfo is the default level of logging for applications.
LevelInfo
// LevelWarn decreases log verbosity by removing informational messages from the stream.
LevelWarn
// LevelError further decreases log verbosity by only surfacing errors encountered by the system.
LevelError
)
// Level allows the logging level to be configured.
type Level uint8
// Set allows a given log level to be set to a certain value.
func (l *Level) Set(val string) error {
switch val {
case "trace", "TRACE":
*l = LevelTrace
case "debug", "DEBUG":
*l = LevelTrace
case "info", "INFO":
*l = LevelTrace
case "warn", "WARN":
*l = LevelTrace
case "error", "ERROR":
*l = LevelTrace
}
return fmt.Errorf("unrecognized level")
}
// String returns the string representation of the Level.
func (l *Level) String() string {
switch *l {
case LevelTrace:
return "trace"
case LevelDebug:
return "debug"
case LevelInfo:
return "info"
case LevelWarn:
return "warn"
case LevelError:
return "error"
}
return "unknown"
}
// Logger defines common operations for writing log messages. // Logger defines common operations for writing log messages.
type Logger interface { type Logger interface {
// Debug adds a debug message to the event stream, if configured. // Debug adds a debug message to the event stream, if configured.
@ -55,7 +107,7 @@ type DoneFunc func()
// Tracer allows method calls to be instrumented for debugging. // Tracer allows method calls to be instrumented for debugging.
type Tracer interface { type Tracer interface {
// Trace captures the method calling context and returns a function that can be invoked to complete the trace. // Trace captures the method calling context and returns a function that can be invoked to complete the trace.
Trace(ctx context.Context, tags ...Tag) (context.Context, DoneFunc) Trace(ctx *context.Context, tags ...Tag) ActiveTrace
} }
// Span defines an internal structure for tracking traces across an application. This structure is only used for // Span defines an internal structure for tracking traces across an application. This structure is only used for
@ -83,48 +135,8 @@ type Interface[T any] interface {
Wither[T] Wither[T]
} }
type Level int8 // ActiveTrace defines an abstraction that allows traces to be completed using a Done function.
type ActiveTrace interface {
const ( // Done finishes the current tracing operation.
UnknownLevel = 0 Done()
TraceLevel = 1
DebugLevel = 2
InfoLevel = 3
WarnLevel = 4
ErrorLevel = 5
)
func (l *Level) String() string {
switch *l {
case TraceLevel:
return "trace"
case DebugLevel:
return "debug"
case InfoLevel:
return "info"
case WarnLevel:
return "warn"
case ErrorLevel:
return "error"
}
return "unknown"
}
func (l *Level) Set(val string) error {
switch {
case strings.EqualFold("trace", val):
*l = TraceLevel
case strings.EqualFold("debug", val):
*l = DebugLevel
case strings.EqualFold("info", val):
*l = InfoLevel
case strings.EqualFold("warn", val):
*l = WarnLevel
case strings.EqualFold("error", val):
*l = ErrorLevel
}
*l = UnknownLevel
return nil
} }

217
client.go

@ -26,18 +26,17 @@ import (
"encoding/base32" "encoding/base32"
"io" "io"
"runtime" "runtime"
"sync"
"time" "time"
"go.pitz.tech/okit/observer"
"go.pitz.tech/okit/pb" "go.pitz.tech/okit/pb"
) )
var encoding = base32.StdEncoding.WithPadding(base32.NoPadding) var (
encoding = base32.StdEncoding.WithPadding(base32.NoPadding)
// NewClient produces a new default client implementation for emitting metrics. defaultUUID = func() string {
func NewClient() Client {
return Client{
now: time.Now,
uuid: func() string {
buf := make([]byte, 16) buf := make([]byte, 16)
n, err := io.ReadFull(rand.Reader, buf) n, err := io.ReadFull(rand.Reader, buf)
if err != nil { if err != nil {
@ -45,69 +44,94 @@ func NewClient() Client {
} }
return encoding.EncodeToString(buf[:n]) return encoding.EncodeToString(buf[:n])
}, }
)
// NewClient produces a new default client implementation for emitting metrics.
func NewClient() Client {
return Client{
observer: observer.Local(),
level: LevelInfo,
now: time.Now,
uuid: defaultUUID,
callerSkip: 2, callerSkip: 2,
} }
} }
// Client provides a default implementation. // Client provides a default implementation.
type Client struct { type Client struct {
observer observer.Observer
level Level
now func() time.Time now func() time.Time
uuid func() string uuid func() string
tags []Tag tags []Tag
callerSkip int callerSkip int
} }
// WithObservers configures the new client with the provided observers.
func (c Client) WithObservers(observers ...observer.Observer) Client {
c.observer = observer.Composite(observers)
return c
}
// WithLevel allows the current logging level to be tuned. This does not prevent traces from being emit, or metrics
// from being reported. It only modifies the behavior of logs.
func (c Client) WithLevel(level Level) Client {
c.level = level
return c
}
// WithNow configures the function that's used to obtain the current timestamp. // WithNow configures the function that's used to obtain the current timestamp.
func (o Client) WithNow(now func() time.Time) Client { func (c Client) WithNow(now func() time.Time) Client {
o.now = now c.now = now
return o return c
} }
// WithUUID returns a new uuid that uniquely identifies traces, spans, and tags. // WithUUID returns a new uuid that uniquely identifies traces, spans, and tags.
func (o Client) WithUUID(uuid func() string) Client { func (c Client) WithUUID(uuid func() string) Client {
o.uuid = uuid c.uuid = uuid
return o return c
} }
// WithCallerSkip is used to configure the number of frames to skip when determining the caller. Callers are // WithCallerSkip is used to configure the number of frames to skip when determining the caller. Callers are
// predominantly used when performing traces. // predominantly used when performing traces.
func (o Client) WithCallerSkip(callerSkip int) Client { func (c Client) WithCallerSkip(callerSkip int) Client {
o.callerSkip = callerSkip c.callerSkip = callerSkip
return o return c
} }
// With appends tags to the current client that will automatically be added to all events, metrics, logs, and traces. // With appends tags to the current client that will automatically be added to all events, metrics, logs, and traces.
func (o Client) With(tags ...Tag) Client { func (c Client) With(tags ...Tag) Client {
o.tags = append(o.tags, tags...) c.tags = append(c.tags, tags...)
return o return c
} }
// Emit allows for the emission of an event which has now value and only associated tags. // Emit allows for the emission of an event which has now value and only associated tags.
func (o Client) Emit(event string, tags ...Tag) { func (c Client) Emit(event string, tags ...Tag) {
o.With(tags...).emit(&pb.Entry{ c.With(tags...).emit(&pb.Entry{
Kind: pb.Kind_Event, Kind: pb.Kind_Event,
Scope: event, Scope: event,
}) })
} }
// Observe reports a metric and it's associated value. // Observe reports a metric and it's associated value.
func (o Client) Observe(metric string, value float64, tags ...Tag) { func (c Client) Observe(metric string, value float64, tags ...Tag) {
o.With(tags...).emit(&pb.Entry{ c.With(tags...).With(Float64("value", value)).emit(&pb.Entry{
Value: &pb.Entry_Double{Double: value},
Kind: pb.Kind_Metric, Kind: pb.Kind_Metric,
Scope: metric, Scope: metric,
}) })
} }
// Trace allows a method to be traced, and it's execution time recorded and reported for viewing. // Trace allows a method to be traced, and it's execution time recorded and reported for viewing.
func (o Client) Trace(ctx context.Context, tags ...Tag) (context.Context, DoneFunc) { func (c Client) Trace(ctxp *context.Context, tags ...Tag) ActiveTrace {
name, _ := caller(o.callerSkip) ctx := *ctxp
start := o.now()
name, _ := caller(c.callerSkip)
start := c.now()
span := Span{ span := Span{
TraceID: o.uuid(), TraceID: c.uuid(),
ID: o.uuid(), ID: c.uuid(),
} }
v := ctx.Value(SpanKey) v := ctx.Value(SpanKey)
@ -120,101 +144,142 @@ func (o Client) Trace(ctx context.Context, tags ...Tag) (context.Context, DoneFu
} }
//goland:noinspection GoAssignmentToReceiver //goland:noinspection GoAssignmentToReceiver
o = o.With( c = c.
append( With(
[]Tag{
String("traceId", span.TraceID), String("traceId", span.TraceID),
String("traceSpanId", span.ID), String("traceSpanId", span.ID),
String("traceParentId", span.ParentID), String("traceParentId", span.ParentID),
}, ).
tags..., With(tags...)
)...,
)
// bookend if c.level <= LevelTrace {
o.With(String("bookend", "start")).emit(&pb.Entry{ c.With(String("bookend", "start"), level(LevelTrace)).
emit(&pb.Entry{
Kind: pb.Kind_Log, Kind: pb.Kind_Log,
Scope: name, Scope: name,
Value: &pb.Entry_String_{String_: "trace"},
}) })
}
return context.WithValue(ctx, SpanKey, span), func() { ctx = context.WithValue(ctx, SpanKey, span)
duration := o.now().Sub(start) *ctxp = ctx
// bookend return &active{name, start, c, sync.Once{}}
o.With(String("bookend", "end")).emit(&pb.Entry{ }
Kind: pb.Kind_Log,
Scope: name,
Value: &pb.Entry_String_{String_: "trace"},
})
// trace type active struct {
o.emit( name string
start time.Time
c Client
once sync.Once
}
func (a *active) Done() {
a.once.Do(func() {
duration := a.c.now().Sub(a.start)
//goland:noinspection GoAssignmentToReceiver
a.c = a.c.With(Duration("duration", duration))
if a.c.level <= LevelTrace {
a.c.With(String("bookend", "end"), level(LevelTrace)).
emit(
&pb.Entry{ &pb.Entry{
Kind: pb.Kind_Trace, Kind: pb.Kind_Log,
Scope: name, Scope: a.name,
Value: &pb.Entry_Duration{Duration: pb.DurationPB(duration)},
}, },
) )
} }
// trace
a.c.emit(&pb.Entry{
Kind: pb.Kind_Trace,
Scope: a.name,
})
})
} }
// Debug produces a debug log event. // Debug produces a debug log event.
func (o Client) Debug(msg string, tags ...Tag) { func (c Client) Debug(msg string, tags ...Tag) {
o.With(tags...).emit(&pb.Entry{ if c.level > LevelDebug {
return
}
name, _ := caller(c.callerSkip)
c.With(tags...).
With(message(msg), level(LevelDebug)).
emit(&pb.Entry{
Kind: pb.Kind_Log, Kind: pb.Kind_Log,
Scope: msg, Scope: name,
Value: &pb.Entry_String_{String_: "debug"},
}) })
} }
// Info produces an information log event. // Info produces an information log event.
func (o Client) Info(msg string, tags ...Tag) { func (c Client) Info(msg string, tags ...Tag) {
o.With(tags...).emit(&pb.Entry{ if c.level > LevelInfo {
return
}
name, _ := caller(c.callerSkip)
c.With(tags...).
With(message(msg), level(LevelInfo)).
emit(&pb.Entry{
Kind: pb.Kind_Log, Kind: pb.Kind_Log,
Scope: msg, Scope: name,
Value: &pb.Entry_String_{String_: "info"},
}) })
} }
// Warn produces a warning that is surfaced to operators. // Warn produces a warning that is surfaced to operators.
func (o Client) Warn(msg string, tags ...Tag) { func (c Client) Warn(msg string, tags ...Tag) {
o.With(tags...).emit(&pb.Entry{ if c.level > LevelWarn {
return
}
name, _ := caller(c.callerSkip)
c.With(tags...).
With(message(msg), level(LevelWarn)).
emit(&pb.Entry{
Kind: pb.Kind_Log, Kind: pb.Kind_Log,
Scope: msg, Scope: name,
Value: &pb.Entry_String_{String_: "warn"},
}) })
} }
// Error produces a message that communicates an error has occurred. // Error produces a message that communicates an error has occurred.
func (o Client) Error(msg string, tags ...Tag) { func (c Client) Error(msg string, tags ...Tag) {
o.With(tags...).emit(&pb.Entry{ if c.level > LevelError {
return
}
name, _ := caller(c.callerSkip)
c.With(tags...).
With(message(msg), level(LevelError)).
emit(&pb.Entry{
Kind: pb.Kind_Log, Kind: pb.Kind_Log,
Scope: msg, Scope: name,
Value: &pb.Entry_String_{String_: "error"},
}) })
} }
func (o Client) emit(entries ...*pb.Entry) { func (c Client) emit(entries ...*pb.Entry) {
now := pb.TimestampPB(o.now()) now := pb.TimestampPB(c.now())
tags := make([]*pb.Tag, 0, len(o.tags)) tags := make([]*pb.Tag, 0, len(c.tags))
for _, tag := range o.tags { for _, tag := range c.tags {
tags = append(tags, tag.AsTagPB()) tags = append(tags, tag.AsTagPB())
} }
for _, entry := range entries { for _, entry := range entries {
entry.Timestamp = now entry.Timestamp = now
entry.Tags = tags entry.Tags = tags
_ = DefaultFormat.Marshal(sink, entry)
} }
_ = sink.Flush() c.observer.Observe(entries)
} }
// caller attempts to get method caller information. This information is used for tracing information across an // caller attempts to get method caller information. This information is used for tracing information across an
// applications source code. // applications source code. Need to look into the performance of this call. Parts of it may be able to be cached.
func caller(skip int) (name string, line int) { func caller(skip int) (name string, line int) {
pctr, _, line, ok := runtime.Caller(skip) pctr, _, line, ok := runtime.Caller(skip)
if !ok { if !ok {

@ -25,11 +25,20 @@ import (
"go.pitz.tech/okit" "go.pitz.tech/okit"
okithttp "go.pitz.tech/okit/http" okithttp "go.pitz.tech/okit/http"
"go.pitz.tech/okit/observer"
) )
func main() { func main() {
// Enable JSON output client := okit.NewClient().
okit.DefaultFormat = okit.JSONFormat{} WithObservers(
observer.Local(
observer.Format("json"),
),
// TODO: add remote observers
)
okit.Replace(client)
// Instrument HTTP Clients // Instrument HTTP Clients
okithttp.InstrumentClient(http.DefaultClient) okithttp.InstrumentClient(http.DefaultClient)

@ -27,6 +27,9 @@ import (
) )
func main() { func main() {
client := okit.NewClient().WithLevel(okit.LevelTrace)
okit.Replace(client)
var tags []okit.Tag var tags []okit.Tag
// Metric emission // Metric emission
@ -39,9 +42,8 @@ func main() {
okit.Emit("user_signup", tags...) okit.Emit("user_signup", tags...)
// Tracing // Tracing
ctx, done := okit.Trace(context.Background(), tags...) ctx := context.Background()
defer done() defer okit.Trace(&ctx, tags...).Done()
_ = ctx.Err() // not needed, removes unused error
// Logging // Logging
okit.Debug("a message used for debugging", tags...) okit.Debug("a message used for debugging", tags...)

@ -1,61 +0,0 @@
package okit
import (
"bufio"
"strconv"
"strings"
"time"
"github.com/golang/protobuf/jsonpb"
"go.pitz.tech/okit/pb"
)
// Format defines an abstraction for writing entries to stdout/stderr. This is predominantly used in report log, trace,
// and event information from the process.
type Format interface {
Marshal(writer *bufio.Writer, entry *pb.Entry) error
}
// JSONFormat writes entries using JSON encoded protocol buffers.
type JSONFormat struct {
Marshaler jsonpb.Marshaler
}
func (f JSONFormat) Marshal(writer *bufio.Writer, entry *pb.Entry) error {
f.Marshaler.Marshal(writer, entry)
return writer.WriteByte('\n')
}
// TextFormat writes entries using a custom text format.
type TextFormat struct{}
func (f TextFormat) Marshal(writer *bufio.Writer, entry *pb.Entry) error {
sink.WriteString(entry.Timestamp.AsTime().Format(time.RFC3339))
sink.WriteByte('\t')
sink.WriteString(strings.ToUpper(entry.Kind.String()))
sink.WriteByte('\t')
switch v := entry.GetValue().(type) {
case *pb.Entry_String_:
sink.WriteString(v.String_)
case *pb.Entry_Double:
sink.WriteString(strconv.FormatFloat(v.Double, 'f', 5, 64))
case *pb.Entry_Duration:
sink.WriteString(v.Duration.AsDuration().String())
}
sink.WriteByte('\t')
sink.WriteString(entry.Scope)
sink.WriteByte('\t')
for _, tag := range entry.Tags {
qs := tag.AsQueryString()
if qs == "" {
continue
}
sink.WriteString(qs)
sink.WriteByte('&')
}
return sink.WriteByte('\n')
}

71
format/format.go Normal file

@ -0,0 +1,71 @@
// Copyright (C) 2022 The OKit Authors
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
// EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
// IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE
// OR OTHER DEALINGS IN THE SOFTWARE.
package format
import (
"bufio"
"strings"
"time"
"github.com/golang/protobuf/jsonpb"
"go.pitz.tech/okit/pb"
)
// Marshaler defines an abstraction for writing entries to stdout/stderr. This is predominantly used in report log, trace,
// and event information from the process.
type Marshaler interface {
Marshal(writer *bufio.Writer, entry *pb.Entry) error
}
// JSON writes entries using JSON encoded protocol buffers.
type JSON struct {
Marshaler jsonpb.Marshaler
}
func (f JSON) Marshal(writer *bufio.Writer, entry *pb.Entry) error {
f.Marshaler.Marshal(writer, entry)
return writer.WriteByte('\n')
}
// Text writes entries using a custom text format. Entries are written using a TSV format and tags are url-encoded
// key-value pairs.
type Text struct{}
func (f Text) Marshal(writer *bufio.Writer, entry *pb.Entry) error {
writer.WriteString(entry.Timestamp.AsTime().Format(time.RFC3339))
writer.WriteByte('\t')
writer.WriteString(strings.ToUpper(entry.Kind.String()))
writer.WriteByte('\t')
writer.WriteString(entry.Scope)
writer.WriteByte('\t')
for _, tag := range entry.Tags {
qs := tag.AsQueryString()
if qs == "" {
continue
}
writer.WriteString(qs)
writer.WriteByte('&')
}
return writer.WriteByte('\n')
}

@ -38,7 +38,7 @@ func (e *Endpoint) Health(w http.ResponseWriter, r *http.Request) {
func (e *Endpoint) Metrics(w http.ResponseWriter, r *http.Request) { func (e *Endpoint) Metrics(w http.ResponseWriter, r *http.Request) {
// Render metrics endpoint // Render metrics endpoint
// Unlike prometheus metrics are pushed, not scraped. So there's no need for complicated protocol here. // Unlike prometheus, metrics are pushed and not scraped. So there's no need for a complicated protocol here.
} }
func (e *Endpoint) Trace(w http.ResponseWriter, r *http.Request) { func (e *Endpoint) Trace(w http.ResponseWriter, r *http.Request) {

@ -21,11 +21,19 @@
package http package http
import ( import (
"context"
"net/http" "net/http"
"go.pitz.tech/okit" "go.pitz.tech/okit"
) )
const (
// OkitTraceIDHeader is a string constant that defines a common key used to propagate an okit trace.
OkitTraceIDHeader = "x-okit-trace-id"
// OkitSpanIDHeader is a string constant that defines a common key used to propagate the parent span.
OkitSpanIDHeader = "x-okit-span-id"
)
// InstrumentClient updates the provided http.Client to use an instrumented http.RoundTripper. If the provided // InstrumentClient updates the provided http.Client to use an instrumented http.RoundTripper. If the provided
// client.Transport is nil, then http.DefaultTransport is used. // client.Transport is nil, then http.DefaultTransport is used.
func InstrumentClient(client *http.Client) { func InstrumentClient(client *http.Client) {
@ -46,16 +54,26 @@ type roundTripper struct {
} }
func (r *roundTripper) RoundTrip(req *http.Request) (resp *http.Response, err error) { func (r *roundTripper) RoundTrip(req *http.Request) (resp *http.Response, err error) {
ctx := req.Context()
statusCode := 0 statusCode := 0
tags := []okit.Tag{
ctx, done := okit.Trace(req.Context(),
okit.String("host", req.URL.Host), okit.String("host", req.URL.Host),
okit.String("method", req.Method), okit.String("method", req.Method),
okit.String("path", req.URL.Path), okit.String("path", req.URL.Path),
okit.Intp("status", &statusCode), okit.Intp("status", &statusCode),
okit.Errp(&err), }
)
defer done() defer okit.Trace(&ctx, tags...).Done()
// propagate to server
// TODO: support spec compliant propagation
if v := ctx.Value(okit.SpanKey); v != nil {
if span, ok := v.(okit.Span); ok {
req.Header.Set(OkitTraceIDHeader, span.TraceID)
req.Header.Set(OkitSpanIDHeader, span.ID)
}
}
resp, err = r.delegate.RoundTrip(req.WithContext(ctx)) resp, err = r.delegate.RoundTrip(req.WithContext(ctx))
if err != nil { if err != nil {
@ -79,15 +97,27 @@ type handler struct {
} }
func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
statusCode := 200 ctx := r.Context()
ctx, done := okit.Trace(r.Context(), // propagation from client
traceID := r.Header.Get(OkitTraceIDHeader)
spanID := r.Header.Get(OkitSpanIDHeader)
if traceID != "" && spanID != "" {
ctx = context.WithValue(ctx, okit.SpanKey, okit.Span{
TraceID: traceID,
ID: spanID,
})
}
statusCode := 200
tags := []okit.Tag{
okit.String("host", r.URL.Host), okit.String("host", r.URL.Host),
okit.String("method", r.Method), okit.String("method", r.Method),
okit.String("path", r.URL.Path), okit.String("path", r.URL.Path),
okit.Intp("status", &statusCode), okit.Intp("status", &statusCode),
) }
defer done()
defer okit.Trace(&ctx, tags...).Done()
h.delegate.ServeHTTP( h.delegate.ServeHTTP(
&response{ &response{

81
observer/local.go Normal file

@ -0,0 +1,81 @@
// Copyright (C) 2022 The OKit Authors
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
// EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
// IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE
// OR OTHER DEALINGS IN THE SOFTWARE.
package observer
import (
"bufio"
"io"
"os"
"go.pitz.tech/okit/format"
"go.pitz.tech/okit/pb"
)
// LocalOption defines a mechanism that allows properties of the observer to be overridden or configured.
type LocalOption func(l *local)
// Output configures the output of the local observer to point to the provided writer.
func Output(writer io.Writer) LocalOption {
return func(l *local) {
l.output = bufio.NewWriter(writer)
}
}
// Format configures how the information is written to the target writer.
func Format(fmt string) LocalOption {
return func(l *local) {
switch fmt {
case "json":
l.format = format.JSON{}
default:
l.format = format.Text{}
}
}
}
// Local creates an Observer that writes logs to an output stream using a configured format.
func Local(opts ...LocalOption) Observer {
l := &local{
output: bufio.NewWriter(os.Stdout),
format: format.Text{},
}
for _, o := range opts {
o(l)
}
return l
}
type local struct {
output *bufio.Writer
format format.Marshaler
}
func (l *local) Observe(entries []*pb.Entry) {
// TODO: how to handle internal errors
for _, entry := range entries {
_ = l.format.Marshal(l.output, entry)
}
_ = l.output.Flush()
}

39
observer/observer.go Normal file

@ -0,0 +1,39 @@
// Copyright (C) 2022 The OKit Authors
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
// EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
// IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE
// OR OTHER DEALINGS IN THE SOFTWARE.
package observer
import (
"go.pitz.tech/okit/pb"
)
// Observer provides a mechanism to subscribe to entries within the okit system.
type Observer interface {
Observe(entries []*pb.Entry)
}
// Composite provides a way for multiple observers to be plugged into the system.
type Composite []Observer
func (c Composite) Observe(entries []*pb.Entry) {
for _, observer := range c {
observer.Observe(entries)
}
}

@ -21,19 +21,31 @@
package okit package okit
import ( import (
"bufio"
"context" "context"
"os"
) )
// DefaultClient provides a default client implementation. var (
var DefaultClient = NewClient() global chan Client
)
// DefaultFormat is used to format how data is written to stdout. func init() {
var DefaultFormat Format = TextFormat{} global = make(chan Client, 1)
global <- NewClient()
}
// TODO: make this configurable // Replace updates the default Client to be the provided one.
var sink = bufio.NewWriter(os.Stdout) func Replace(c Client) {
<-global
global <- c
}
// C returns a copy of the current default Client.
func C() Client {
c := <-global
global <- c
return c
}
// ContextKey is a holder structure that allows data to be attached to contexts. // ContextKey is a holder structure that allows data to be attached to contexts.
type ContextKey string type ContextKey string
@ -43,36 +55,40 @@ var SpanKey = ContextKey("okit.span")
// With returns a client containing additional tags. // With returns a client containing additional tags.
func With(tags ...Tag) Client { func With(tags ...Tag) Client {
return DefaultClient.WithCallerSkip(3).With(tags...) return C().With(tags...)
} }
// Emit emits an event with the provided tags. // Emit emits an event with the provided tags.
func Emit(event string, tags ...Tag) { func Emit(event string, tags ...Tag) {
DefaultClient.WithCallerSkip(3).Emit(event, tags...) C().WithCallerSkip(3).Emit(event, tags...)
} }
// Observe reports a metric value with the provided tags. // Observe reports a metric value with the provided tags.
func Observe(metric string, value float64, tags ...Tag) { func Observe(metric string, value float64, tags ...Tag) {
DefaultClient.WithCallerSkip(3).Observe(metric, value, tags...) C().WithCallerSkip(3).Observe(metric, value, tags...)
} }
// Trace traces a function call. It implements distributed tracing and bookend events. // Trace traces a function call. It implements distributed tracing and bookend events.
func Trace(ctx context.Context, tags ...Tag) (context.Context, DoneFunc) { func Trace(ctxp *context.Context, tags ...Tag) ActiveTrace {
return DefaultClient.WithCallerSkip(3).Trace(ctx, tags...) return C().WithCallerSkip(3).Trace(ctxp, tags...)
} }
// Debug writes a message at a debug level.
func Debug(msg string, tags ...Tag) { func Debug(msg string, tags ...Tag) {
DefaultClient.WithCallerSkip(3).Debug(msg, tags...) C().WithCallerSkip(3).Debug(msg, tags...)
} }
// Info writes a message at a informational level.
func Info(msg string, tags ...Tag) { func Info(msg string, tags ...Tag) {
DefaultClient.WithCallerSkip(3).Info(msg, tags...) C().WithCallerSkip(3).Info(msg, tags...)
} }
// Warn writes a message at a warning level.
func Warn(msg string, tags ...Tag) { func Warn(msg string, tags ...Tag) {
DefaultClient.WithCallerSkip(3).Warn(msg, tags...) C().WithCallerSkip(3).Warn(msg, tags...)
} }
// Error writes a message at a error level.
func Error(msg string, tags ...Tag) { func Error(msg string, tags ...Tag) {
DefaultClient.WithCallerSkip(3).Error(msg, tags...) C().WithCallerSkip(3).Error(msg, tags...)
} }

@ -385,13 +385,6 @@ type Entry struct {
Kind Kind `protobuf:"varint,3,opt,name=kind,proto3,enum=obkit.Kind" json:"kind,omitempty"` Kind Kind `protobuf:"varint,3,opt,name=kind,proto3,enum=obkit.Kind" json:"kind,omitempty"`
// Tags is an optional field that contains a list of metadata associated with the entry. // Tags is an optional field that contains a list of metadata associated with the entry.
Tags []*Tag `protobuf:"bytes,4,rep,name=tags,proto3" json:"tags,omitempty"` Tags []*Tag `protobuf:"bytes,4,rep,name=tags,proto3" json:"tags,omitempty"`
// Value is an optional field that defines a value associated with the entry (e.g. in the case of a log, metric, or trace).
//
// Types that are assignable to Value:
// *Entry_String_
// *Entry_Double
// *Entry_Duration
Value isEntry_Value `protobuf_oneof:"value"`
} }
func (x *Entry) Reset() { func (x *Entry) Reset() {
@ -454,56 +447,6 @@ func (x *Entry) GetTags() []*Tag {
return nil return nil
} }
func (m *Entry) GetValue() isEntry_Value {
if m != nil {
return m.Value
}
return nil
}
func (x *Entry) GetString_() string {
if x, ok := x.GetValue().(*Entry_String_); ok {
return x.String_
}
return ""
}
func (x *Entry) GetDouble() float64 {
if x, ok := x.GetValue().(*Entry_Double); ok {
return x.Double
}
return 0
}
func (x *Entry) GetDuration() *Duration {
if x, ok := x.GetValue().(*Entry_Duration); ok {
return x.Duration
}
return nil
}
type isEntry_Value interface {
isEntry_Value()
}
type Entry_String_ struct {
String_ string `protobuf:"bytes,5,opt,name=string,proto3,oneof"` // used for logs
}
type Entry_Double struct {
Double float64 `protobuf:"fixed64,6,opt,name=double,proto3,oneof"` // used for metrics
}
type Entry_Duration struct {
Duration *Duration `protobuf:"bytes,7,opt,name=duration,proto3,oneof"` // used for traces
}
func (*Entry_String_) isEntry_Value() {}
func (*Entry_Double) isEntry_Value() {}
func (*Entry_Duration) isEntry_Value() {}
// ClientInfo is used to capture specific information about the client application that is emitting information. // ClientInfo is used to capture specific information about the client application that is emitting information.
type ClientInfo struct { type ClientInfo struct {
state protoimpl.MessageState state protoimpl.MessageState
@ -647,7 +590,7 @@ var file_obkit_proto_rawDesc = []byte{
0x61, 0x6d, 0x70, 0x18, 0x0b, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x10, 0x2e, 0x6f, 0x62, 0x6b, 0x69, 0x61, 0x6d, 0x70, 0x18, 0x0b, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x10, 0x2e, 0x6f, 0x62, 0x6b, 0x69,
0x74, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x48, 0x00, 0x52, 0x09, 0x74, 0x74, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x48, 0x00, 0x52, 0x09, 0x74,
0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x42, 0x07, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x42, 0x07, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75,
0x65, 0x22, 0xfa, 0x01, 0x0a, 0x05, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x2e, 0x0a, 0x09, 0x74, 0x65, 0x22, 0x8e, 0x01, 0x0a, 0x05, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x2e, 0x0a, 0x09, 0x74,
0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x10, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x10,
0x2e, 0x6f, 0x62, 0x6b, 0x69, 0x74, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x6f, 0x62, 0x6b, 0x69, 0x74, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70,
0x52, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x12, 0x14, 0x0a, 0x05, 0x73, 0x52, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x12, 0x14, 0x0a, 0x05, 0x73,
@ -656,29 +599,23 @@ var file_obkit_proto_rawDesc = []byte{
0x0b, 0x2e, 0x6f, 0x62, 0x6b, 0x69, 0x74, 0x2e, 0x4b, 0x69, 0x6e, 0x64, 0x52, 0x04, 0x6b, 0x69, 0x0b, 0x2e, 0x6f, 0x62, 0x6b, 0x69, 0x74, 0x2e, 0x4b, 0x69, 0x6e, 0x64, 0x52, 0x04, 0x6b, 0x69,
0x6e, 0x64, 0x12, 0x1e, 0x0a, 0x04, 0x74, 0x61, 0x67, 0x73, 0x18, 0x04, 0x20, 0x03, 0x28, 0x0b, 0x6e, 0x64, 0x12, 0x1e, 0x0a, 0x04, 0x74, 0x61, 0x67, 0x73, 0x18, 0x04, 0x20, 0x03, 0x28, 0x0b,
0x32, 0x0a, 0x2e, 0x6f, 0x62, 0x6b, 0x69, 0x74, 0x2e, 0x54, 0x61, 0x67, 0x52, 0x04, 0x74, 0x61, 0x32, 0x0a, 0x2e, 0x6f, 0x62, 0x6b, 0x69, 0x74, 0x2e, 0x54, 0x61, 0x67, 0x52, 0x04, 0x74, 0x61,
0x67, 0x73, 0x12, 0x18, 0x0a, 0x06, 0x73, 0x74, 0x72, 0x69, 0x6e, 0x67, 0x18, 0x05, 0x20, 0x01, 0x67, 0x73, 0x22, 0x3c, 0x0a, 0x0a, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x49, 0x6e, 0x66, 0x6f,
0x28, 0x09, 0x48, 0x00, 0x52, 0x06, 0x73, 0x74, 0x72, 0x69, 0x6e, 0x67, 0x12, 0x18, 0x0a, 0x06, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04,
0x64, 0x6f, 0x75, 0x62, 0x6c, 0x65, 0x18, 0x06, 0x20, 0x01, 0x28, 0x01, 0x48, 0x00, 0x52, 0x06, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x1a, 0x0a, 0x08, 0x69, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x63, 0x65,
0x64, 0x6f, 0x75, 0x62, 0x6c, 0x65, 0x12, 0x2d, 0x0a, 0x08, 0x64, 0x75, 0x72, 0x61, 0x74, 0x69, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x69, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x63, 0x65,
0x6f, 0x6e, 0x18, 0x07, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x6f, 0x62, 0x6b, 0x69, 0x74, 0x22, 0x5b, 0x0a, 0x06, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x12, 0x29, 0x0a, 0x06, 0x63, 0x6c,
0x2e, 0x44, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x48, 0x00, 0x52, 0x08, 0x64, 0x75, 0x72, 0x69, 0x65, 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x6f, 0x62, 0x6b,
0x61, 0x74, 0x69, 0x6f, 0x6e, 0x42, 0x07, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x22, 0x3c, 0x69, 0x74, 0x2e, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x06, 0x63,
0x0a, 0x0a, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x12, 0x0a, 0x04, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x12, 0x26, 0x0a, 0x07, 0x65, 0x6e, 0x74, 0x72, 0x69, 0x65, 0x73,
0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0c, 0x2e, 0x6f, 0x62, 0x6b, 0x69, 0x74, 0x2e, 0x45,
0x12, 0x1a, 0x0a, 0x08, 0x69, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x63, 0x65, 0x18, 0x02, 0x20, 0x01, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x07, 0x65, 0x6e, 0x74, 0x72, 0x69, 0x65, 0x73, 0x2a, 0x3e, 0x0a,
0x28, 0x09, 0x52, 0x08, 0x69, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x63, 0x65, 0x22, 0x5b, 0x0a, 0x06, 0x04, 0x4b, 0x69, 0x6e, 0x64, 0x12, 0x0b, 0x0a, 0x07, 0x55, 0x6e, 0x6b, 0x6e, 0x6f, 0x77, 0x6e,
0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x12, 0x29, 0x0a, 0x06, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x10, 0x00, 0x12, 0x09, 0x0a, 0x05, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x10, 0x01, 0x12, 0x0a, 0x0a,
0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x6f, 0x62, 0x6b, 0x69, 0x74, 0x2e, 0x43, 0x06, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x10, 0x02, 0x12, 0x09, 0x0a, 0x05, 0x54, 0x72, 0x61,
0x6c, 0x69, 0x65, 0x6e, 0x74, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x06, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x63, 0x65, 0x10, 0x03, 0x12, 0x07, 0x0a, 0x03, 0x4c, 0x6f, 0x67, 0x10, 0x04, 0x42, 0x1c, 0x5a,
0x74, 0x12, 0x26, 0x0a, 0x07, 0x65, 0x6e, 0x74, 0x72, 0x69, 0x65, 0x73, 0x18, 0x02, 0x20, 0x03, 0x1a, 0x63, 0x6f, 0x64, 0x65, 0x2e, 0x70, 0x69, 0x74, 0x7a, 0x2e, 0x74, 0x65, 0x63, 0x68, 0x2f,
0x28, 0x0b, 0x32, 0x0c, 0x2e, 0x6f, 0x62, 0x6b, 0x69, 0x74, 0x2e, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x6d, 0x79, 0x61, 0x2f, 0x6f, 0x6b, 0x69, 0x74, 0x2f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f,
0x52, 0x07, 0x65, 0x6e, 0x74, 0x72, 0x69, 0x65, 0x73, 0x2a, 0x3e, 0x0a, 0x04, 0x4b, 0x69, 0x6e, 0x74, 0x6f, 0x33,
0x64, 0x12, 0x0b, 0x0a, 0x07, 0x55, 0x6e, 0x6b, 0x6e, 0x6f, 0x77, 0x6e, 0x10, 0x00, 0x12, 0x09,
0x0a, 0x05, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x10, 0x01, 0x12, 0x0a, 0x0a, 0x06, 0x4d, 0x65, 0x74,
0x72, 0x69, 0x63, 0x10, 0x02, 0x12, 0x09, 0x0a, 0x05, 0x54, 0x72, 0x61, 0x63, 0x65, 0x10, 0x03,
0x12, 0x07, 0x0a, 0x03, 0x4c, 0x6f, 0x67, 0x10, 0x04, 0x42, 0x1c, 0x5a, 0x1a, 0x63, 0x6f, 0x64,
0x65, 0x2e, 0x70, 0x69, 0x74, 0x7a, 0x2e, 0x74, 0x65, 0x63, 0x68, 0x2f, 0x6d, 0x79, 0x61, 0x2f,
0x6f, 0x6b, 0x69, 0x74, 0x2f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
} }
var ( var (
@ -710,14 +647,13 @@ var file_obkit_proto_depIdxs = []int32{
2, // 2: obkit.Entry.timestamp:type_name -> obkit.Timestamp 2, // 2: obkit.Entry.timestamp:type_name -> obkit.Timestamp
0, // 3: obkit.Entry.kind:type_name -> obkit.Kind 0, // 3: obkit.Entry.kind:type_name -> obkit.Kind
3, // 4: obkit.Entry.tags:type_name -> obkit.Tag 3, // 4: obkit.Entry.tags:type_name -> obkit.Tag
1, // 5: obkit.Entry.duration:type_name -> obkit.Duration 5, // 5: obkit.Packet.client:type_name -> obkit.ClientInfo
5, // 6: obkit.Packet.client:type_name -> obkit.ClientInfo 4, // 6: obkit.Packet.entries:type_name -> obkit.Entry
4, // 7: obkit.Packet.entries:type_name -> obkit.Entry 7, // [7:7] is the sub-list for method output_type
8, // [8:8] is the sub-list for method output_type 7, // [7:7] is the sub-list for method input_type
8, // [8:8] is the sub-list for method input_type 7, // [7:7] is the sub-list for extension type_name
8, // [8:8] is the sub-list for extension type_name 7, // [7:7] is the sub-list for extension extendee
8, // [8:8] is the sub-list for extension extendee 0, // [0:7] is the sub-list for field type_name
0, // [0:8] is the sub-list for field type_name
} }
func init() { file_obkit_proto_init() } func init() { file_obkit_proto_init() }
@ -808,11 +744,6 @@ func file_obkit_proto_init() {
(*Tag_Duration)(nil), (*Tag_Duration)(nil),
(*Tag_Timestamp)(nil), (*Tag_Timestamp)(nil),
} }
file_obkit_proto_msgTypes[3].OneofWrappers = []interface{}{
(*Entry_String_)(nil),
(*Entry_Double)(nil),
(*Entry_Duration)(nil),
}
type x struct{} type x struct{}
out := protoimpl.TypeBuilder{ out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{ File: protoimpl.DescBuilder{

@ -29,20 +29,24 @@ import (
//go:generate protoc --go_out=paths=source_relative:. -I=. obkit.proto //go:generate protoc --go_out=paths=source_relative:. -I=. obkit.proto
// AsDuration converts a protocol level Duration to a Golang time.Duration.
func (x *Duration) AsDuration() time.Duration { func (x *Duration) AsDuration() time.Duration {
return time.Duration(x.Nanos) return time.Duration(x.Nanos)
} }
// DurationPB converts a time.Duration to a protocol level Duration.
func DurationPB(d time.Duration) *Duration { func DurationPB(d time.Duration) *Duration {
return &Duration{ return &Duration{
Nanos: int64(d), Nanos: int64(d),
} }
} }
// AsTime converts a protocol level Timestamp to a Golang time.Time.
func (x *Timestamp) AsTime() time.Time { func (x *Timestamp) AsTime() time.Time {
return time.Unix(x.Seconds, int64(x.Nanos)) return time.Unix(x.Seconds, int64(x.Nanos))
} }
// TimestampPB converts a time.Time to a protocol level Timestamp.
func TimestampPB(t time.Time) *Timestamp { func TimestampPB(t time.Time) *Timestamp {
return &Timestamp{ return &Timestamp{
Seconds: t.Unix(), Seconds: t.Unix(),
@ -50,7 +54,7 @@ func TimestampPB(t time.Time) *Timestamp {
} }
} }
// AsQueryString converts the associated tag to a URI query string // AsQueryString converts the associated tag to a URI query string.
func (x *Tag) AsQueryString() string { func (x *Tag) AsQueryString() string {
val := "" val := ""

@ -79,13 +79,6 @@ message Entry {
// Tags is an optional field that contains a list of metadata associated with the entry. // Tags is an optional field that contains a list of metadata associated with the entry.
repeated Tag tags = 4; repeated Tag tags = 4;
// Value is an optional field that defines a value associated with the entry (e.g. in the case of a log, metric, or trace).
oneof value {
string string = 5; // used for logs
double double = 6; // used for metrics
Duration duration = 7; // used for traces
}
} }
// ClientInfo is used to capture specific information about the client application that is emitting information. // ClientInfo is used to capture specific information about the client application that is emitting information.
@ -100,9 +93,12 @@ message ClientInfo {
// Packet defines the set of data that is contained within each message sent to the server. // Packet defines the set of data that is contained within each message sent to the server.
message Packet { message Packet {
// Id provides an id for the packet so the server can deduplicate packets of information being sent to them.
bytes id = 1;
// Client contains information that identifies the emitting client. // Client contains information that identifies the emitting client.
ClientInfo client = 1; ClientInfo client = 2;
// Entries are processed in batches, allowing local clients to buffer if necessary. // Entries are processed in batches, allowing local clients to buffer if necessary.
repeated Entry entries = 2; repeated Entry entries = 3;
} }

@ -25,3 +25,9 @@ func Err(err error) Tag { return Tag{"err", err} }
// Errp returns a Tag whose value is read after submission. // Errp returns a Tag whose value is read after submission.
func Errp(err *error) Tag { return Tag{"err", err} } func Errp(err *error) Tag { return Tag{"err", err} }
// level returns a tag that contains the string representation of a level.
func level(level Level) Tag { return Tag{"level", (&level).String()} }
// message returns a tag containing message string to index.
func message(msg string) Tag { return Tag{"msg", msg} }